diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..2b21f65 --- /dev/null +++ b/.flake8 @@ -0,0 +1,2 @@ +[flake8] +max-line-length = 140 \ No newline at end of file diff --git a/.gitea/workflows/ci-cd.yml b/.gitea/workflows/ci-cd.yml new file mode 100644 index 0000000..a70fd61 --- /dev/null +++ b/.gitea/workflows/ci-cd.yml @@ -0,0 +1,400 @@ +name: CI/CD Pipeline + +on: + push: + branches: [ main, development, experimental ] + tags: [ 'v*.*.*' ] + pull_request: + branches: [ main ] + workflow_dispatch: + inputs: + skip_tests: + description: 'Skip tests' + required: false + default: 'false' + type: boolean + image_tag: + description: 'Custom tag for Docker image' + required: false + default: 'latest' + type: string + +jobs: + # ========================================== + # TESTING STAGE + # ========================================== + + unit-tests: + name: Unit Tests (Python ${{ matrix.python-version }}) + runs-on: ubuntu-latest + if: ${{ !inputs.skip_tests }} + strategy: + fail-fast: false + matrix: + python-version: ['3.9', '3.10', '3.11'] + + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Cache pip dependencies + uses: actions/cache@v3 + with: + path: ~/.cache/pip + key: ${{ runner.os }}-py${{ matrix.python-version }}-pip-${{ hashFiles('requirements.txt', 'requirements-test.txt') }} + restore-keys: | + ${{ runner.os }}-py${{ matrix.python-version }}-pip- + + - name: Install dependencies + run: | + python -m pip install --upgrade pip setuptools wheel + pip install -r requirements.txt + pip install -r requirements-test.txt + + - name: Create test configuration + run: | + mkdir -p embed logs + cat > config.ini << EOF + [Pterodactyl] + PanelURL = https://panel.example.com + ClientAPIKey = ptlc_test_client_key_123456789 + ApplicationAPIKey = ptla_test_app_key_987654321 + + [Discord] + Token = test_discord_token_placeholder + AllowedGuildID = 123456789 + EOF + + - name: Run unit tests with coverage + run: | + pytest test_pterodisbot.py \ + -v \ + --tb=short \ + --cov=pterodisbot \ + --cov=server_metrics_graphs \ + --cov-report=xml \ + --cov-report=term \ + --cov-report=html \ + --junitxml=test-results-${{ matrix.python-version }}.xml + + - name: Upload coverage to artifacts + uses: actions/upload-artifact@v3 + with: + name: coverage-report-py${{ matrix.python-version }} + path: | + coverage.xml + htmlcov/ + test-results-${{ matrix.python-version }}.xml + + code-quality: + name: Code Quality & Linting + runs-on: ubuntu-latest + if: ${{ !inputs.skip_tests }} + + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.11' + + - name: Install linting tools + run: | + python -m pip install --upgrade pip + pip install flake8 pylint black isort mypy + + - name: Run flake8 + run: | + flake8 pterodisbot.py server_metrics_graphs.py \ + --max-line-length=140 \ + --ignore=E501,W503,E203 \ + --exclude=venv,__pycache__,build,dist \ + --statistics \ + --output-file=flake8-report.txt + continue-on-error: true + + - name: Run pylint + run: | + pylint pterodisbot.py server_metrics_graphs.py \ + --disable=C0111,C0103,R0913,R0914,R0915,W0718 \ + --max-line-length=140 \ + --output-format=text \ + --reports=y > pylint-report.txt || true + continue-on-error: true + + - name: Check code formatting with black + run: | + black --check --line-length=140 --diff pterodisbot.py server_metrics_graphs.py | tee black-report.txt + continue-on-error: true + + - name: Check import ordering + run: | + isort --check-only --profile black --line-length=140 pterodisbot.py server_metrics_graphs.py + continue-on-error: true + + - name: Type checking with mypy + run: | + mypy pterodisbot.py server_metrics_graphs.py --ignore-missing-imports > mypy-report.txt || true + continue-on-error: true + + - name: Upload linting reports + uses: actions/upload-artifact@v3 + with: + name: code-quality-reports + path: | + flake8-report.txt + pylint-report.txt + black-report.txt + mypy-report.txt + + security-scan: + name: Security Scanning + runs-on: ubuntu-latest + if: ${{ !inputs.skip_tests }} + + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.11' + + - name: Install security tools + run: | + python -m pip install --upgrade pip + pip install bandit safety pip-audit + + - name: Run bandit security scan + run: | + bandit -r . \ + -f json \ + -o bandit-report.json \ + -ll \ + --exclude ./venv,./test_*.py,./tests + continue-on-error: true + + - name: Run safety dependency check + run: | + pip install -r requirements.txt + safety check --json --output safety-report.json || true + continue-on-error: true + + - name: Run pip-audit + run: | + pip-audit --desc --format json --output pip-audit-report.json || true + continue-on-error: true + + - name: Upload security reports + uses: actions/upload-artifact@v3 + with: + name: security-reports + path: | + bandit-report.json + safety-report.json + pip-audit-report.json + + integration-tests: + name: Integration Tests + runs-on: ubuntu-latest + needs: [unit-tests] + if: ${{ !inputs.skip_tests }} + + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.11' + + - name: Cache dependencies + uses: actions/cache@v3 + with: + path: ~/.cache/pip + key: ${{ runner.os }}-pip-integration-${{ hashFiles('requirements.txt') }} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install -r requirements-test.txt + + - name: Create test configuration + run: | + mkdir -p embed logs + cat > config.ini << EOF + [Pterodactyl] + PanelURL = https://panel.example.com + ClientAPIKey = ptlc_test_client_key_123456789 + ApplicationAPIKey = ptla_test_app_key_987654321 + + [Discord] + Token = test_discord_token_placeholder + AllowedGuildID = 123456789 + EOF + + - name: Run integration tests + run: | + pytest test_pterodisbot.py::TestIntegration \ + -v \ + --tb=short \ + --timeout=60 + + # ========================================== + # BUILD STAGE + # ========================================== + + docker-build: + name: Build Docker Image + runs-on: ubuntu-latest + needs: [unit-tests, code-quality, security-scan] + if: | + always() && + (needs.unit-tests.result == 'success' || inputs.skip_tests) && + (github.event_name == 'push' || github.event_name == 'workflow_dispatch') + + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v2 + with: + platforms: arm64 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + with: + platforms: linux/amd64,linux/arm64 + driver-opts: | + image=moby/buildkit:latest + + - name: Log in to registry + uses: docker/login-action@v2 + with: + registry: ${{ vars.REGISTRY }} + username: ${{ secrets.REGISTRY_USERNAME }} + password: ${{ secrets.REGISTRY_PASSWORD }} + + - name: Generate Docker image tags + id: tags + run: | + IMAGE_NAME="${{ vars.REGISTRY }}/${{ github.repository_owner }}/${{ vars.IMAGE_NAME }}" + + if [ -n "${{ github.event.inputs.image_tag }}" ]; then + PRIMARY_TAG="${{ github.event.inputs.image_tag }}" + elif [[ ${{ github.ref }} == refs/tags/v* ]]; then + PRIMARY_TAG="${GITHUB_REF#refs/tags/}" + elif [[ ${{ github.ref }} == refs/heads/main ]]; then + PRIMARY_TAG="latest" + elif [[ ${{ github.ref }} == refs/heads/development ]]; then + PRIMARY_TAG="development" + elif [[ ${{ github.ref }} == refs/heads/experimental ]]; then + PRIMARY_TAG="experimental" + else + PRIMARY_TAG="latest" + fi + + TAGS="$IMAGE_NAME:$PRIMARY_TAG,$IMAGE_NAME:${{ github.sha }}" + + if [[ ${{ github.ref }} == refs/tags/v* ]]; then + MAJOR_MINOR_TAG=$(echo "$PRIMARY_TAG" | sed -E 's/^v([0-9]+\.[0-9]+)\.[0-9]+.*$/v\1/') + if [[ "$MAJOR_MINOR_TAG" != "$PRIMARY_TAG" ]]; then + TAGS="$TAGS,$IMAGE_NAME:$MAJOR_MINOR_TAG" + fi + + MAJOR_TAG=$(echo "$PRIMARY_TAG" | sed -E 's/^v([0-9]+)\.[0-9]+\.[0-9]+.*$/v\1/') + if [[ "$MAJOR_TAG" != "$PRIMARY_TAG" ]]; then + TAGS="$TAGS,$IMAGE_NAME:$MAJOR_TAG" + fi + fi + + echo "tags=$TAGS" >> $GITHUB_OUTPUT + echo "Generated tags: $TAGS" + + - name: Build and push multi-arch image + uses: docker/build-push-action@v4 + with: + context: . + platforms: linux/amd64,linux/arm64 + push: true + cache-from: type=registry,ref=${{ vars.REGISTRY }}/${{ github.repository_owner }}/${{ vars.IMAGE_NAME }}:cache + cache-to: type=registry,ref=${{ vars.REGISTRY }}/${{ github.repository_owner }}/${{ vars.IMAGE_NAME }}:cache,mode=max + tags: ${{ steps.tags.outputs.tags }} + labels: | + org.opencontainers.image.source=${{ github.server_url }}/${{ github.repository }} + org.opencontainers.image.revision=${{ github.sha }} + org.opencontainers.image.created=${{ github.event.head_commit.timestamp }} + + # ========================================== + # REPORTING STAGE + # ========================================== + + test-report: + name: Generate Test Report + runs-on: ubuntu-latest + needs: [unit-tests, code-quality, security-scan, integration-tests] + if: always() && !inputs.skip_tests + + steps: + - name: Download all artifacts + uses: actions/download-artifact@v3 + + - name: Generate test summary + run: | + echo "## ๐Ÿงช Test Results Summary" >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + echo "### Job Status:" >> $GITHUB_STEP_SUMMARY + echo "- โœ… Unit Tests: \`${{ needs.unit-tests.result }}\`" >> $GITHUB_STEP_SUMMARY + echo "- ๐ŸŽจ Code Quality: \`${{ needs.code-quality.result }}\`" >> $GITHUB_STEP_SUMMARY + echo "- ๐Ÿ”’ Security Scan: \`${{ needs.security-scan.result }}\`" >> $GITHUB_STEP_SUMMARY + echo "- ๐Ÿ”— Integration Tests: \`${{ needs.integration-tests.result }}\`" >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + echo "### Artifacts Generated:" >> $GITHUB_STEP_SUMMARY + echo "- Coverage reports (HTML & XML)" >> $GITHUB_STEP_SUMMARY + echo "- Code quality reports (flake8, pylint, black)" >> $GITHUB_STEP_SUMMARY + echo "- Security scan reports (bandit, safety)" >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + echo "**Commit:** \`${{ github.sha }}\`" >> $GITHUB_STEP_SUMMARY + echo "**Branch:** \`${{ github.ref_name }}\`" >> $GITHUB_STEP_SUMMARY + echo "**Triggered by:** ${{ github.actor }}" >> $GITHUB_STEP_SUMMARY + + final-status: + name: CI/CD Pipeline Status + runs-on: ubuntu-latest + needs: [test-report, docker-build] + if: always() + + steps: + - name: Check pipeline status + run: | + echo "## ๐Ÿš€ CI/CD Pipeline Complete" >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + + if [[ "${{ needs.docker-build.result }}" == "success" ]]; then + echo "โœ… **Docker image built and pushed successfully**" >> $GITHUB_STEP_SUMMARY + elif [[ "${{ needs.docker-build.result }}" == "skipped" ]]; then + echo "โญ๏ธ **Docker build skipped**" >> $GITHUB_STEP_SUMMARY + else + echo "โŒ **Docker build failed**" >> $GITHUB_STEP_SUMMARY + fi + + echo "" >> $GITHUB_STEP_SUMMARY + echo "**Pipeline run:** ${{ github.run_number }}" >> $GITHUB_STEP_SUMMARY + echo "**Workflow:** ${{ github.workflow }}" >> $GITHUB_STEP_SUMMARY + + - name: Fail if critical jobs failed + if: | + (needs.unit-tests.result == 'failure' && !inputs.skip_tests) || + needs.docker-build.result == 'failure' + run: exit 1 \ No newline at end of file diff --git a/.gitea/workflows/docker-build.yml b/.gitea/workflows/docker-build.yml deleted file mode 100644 index c5feafb..0000000 --- a/.gitea/workflows/docker-build.yml +++ /dev/null @@ -1,89 +0,0 @@ -name: Docker Build and Push (Multi-architecture) - -on: - push: - branches: [ main, experimental ] - tags: [ 'v*.*.*' ] - workflow_dispatch: - inputs: - image_tag: - description: 'Custom tag for the Docker image' - required: true - default: 'latest' - type: string - -jobs: - build-and-push: - runs-on: ubuntu-latest - steps: - - name: Checkout code - uses: actions/checkout@v3 - - - name: Set up QEMU - uses: docker/setup-qemu-action@v2 - with: - platforms: arm64 - - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v2 - with: - platforms: linux/amd64,linux/arm64 - driver-opts: | - image=moby/buildkit:latest - - - name: Log in to registry - uses: docker/login-action@v2 - with: - registry: ${{ vars.REGISTRY }} - username: ${{ secrets.REGISTRY_USERNAME }} - password: ${{ secrets.REGISTRY_PASSWORD }} - - - name: Generate Docker image tags - id: tags - run: | - # Base image name - IMAGE_NAME="${{ vars.REGISTRY }}/${{ github.repository_owner }}/${{ vars.IMAGE_NAME }}" - - # Determine primary tag - if [ -n "${{ github.event.inputs.image_tag }}" ]; then - PRIMARY_TAG="${{ github.event.inputs.image_tag }}" - elif [[ ${{ github.ref }} == refs/tags/v* ]]; then - PRIMARY_TAG="${GITHUB_REF#refs/tags/}" - elif [[ ${{ github.ref }} == refs/heads/main ]]; then - PRIMARY_TAG="latest" - elif [[ ${{ github.ref }} == refs/heads/experimental ]]; then - PRIMARY_TAG="experimental" - else - PRIMARY_TAG="latest" - fi - - # Start with primary tag and SHA tag - TAGS="$IMAGE_NAME:$PRIMARY_TAG,$IMAGE_NAME:${{ github.sha }}" - - # Add version tags for releases - if [[ ${{ github.ref }} == refs/tags/v* ]]; then - # Add major.minor tag (e.g., v1.2 for v1.2.3) - MAJOR_MINOR_TAG=$(echo "$PRIMARY_TAG" | sed -E 's/^v([0-9]+\.[0-9]+)\.[0-9]+.*$/v\1/') - if [[ "$MAJOR_MINOR_TAG" != "$PRIMARY_TAG" ]]; then - TAGS="$TAGS,$IMAGE_NAME:$MAJOR_MINOR_TAG" - fi - - # Add major tag (e.g., v1 for v1.2.3) - MAJOR_TAG=$(echo "$PRIMARY_TAG" | sed -E 's/^v([0-9]+)\.[0-9]+\.[0-9]+.*$/v\1/') - if [[ "$MAJOR_TAG" != "$PRIMARY_TAG" ]]; then - TAGS="$TAGS,$IMAGE_NAME:$MAJOR_TAG" - fi - fi - - echo "tags=$TAGS" >> $GITHUB_OUTPUT - echo "Generated tags: $TAGS" - - - name: Build and push multi-arch image - uses: docker/build-push-action@v4 - with: - context: . - platforms: linux/amd64,linux/arm64 - push: true - cache-from: type=registry,ref=${{ vars.REGISTRY }}/${{ github.repository_owner }}/${{ vars.IMAGE_NAME }}:cache - cache-to: type=registry,ref=${{ vars.REGISTRY }}/${{ github.repository_owner }}/${{ vars.IMAGE_NAME }}:cache,mode=max - tags: ${{ steps.tags.outputs.tags }} \ No newline at end of file diff --git a/.gitignore b/.gitignore index f184e86..6f18f38 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,14 @@ __pycache__/ *.py[cod] *$py.class +# IDEs +.vscode/ +.idea/ +*.swp +*.swo +*~ +.DS_Store + # C extensions *.so @@ -37,20 +45,33 @@ MANIFEST pip-log.txt pip-delete-this-directory.txt -# Unit test / coverage reports -htmlcov/ -.tox/ -.nox/ +# Testing +__pycache__/ +*.py[cod] +*$py.class +*.so +.pytest_cache/ .coverage .coverage.* -.cache -nosetests.xml +htmlcov/ coverage.xml *.cover -*.py,cover .hypothesis/ -.pytest_cache/ -cover/ +.tox/ +.nox/ + +# Test reports +test-results*.xml +junit*.xml +*-report.txt +*-report.json +bandit-report.json +safety-report.json +pip-audit-report.json +flake8-report.txt +pylint-report.txt +black-report.txt +mypy-report.txt # Translations *.mo @@ -83,37 +104,7 @@ target/ profile_default/ ipython_config.py -# pyenv -# For a library or package, you might want to ignore these files since the code is -# intended to run in multiple environments; otherwise, check them in: -# .python-version - -# pipenv -# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. -# However, in case of collaboration, if having platform-specific dependencies or dependencies -# having no cross-platform support, pipenv may install dependencies that don't work, or not -# install all needed dependencies. -#Pipfile.lock - -# UV -# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. -# This is especially recommended for binary packages to ensure reproducibility, and is more -# commonly ignored for libraries. -#uv.lock - -# poetry -# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. -# This is especially recommended for binary packages to ensure reproducibility, and is more -# commonly ignored for libraries. -# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control -#poetry.lock - # pdm -# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. -#pdm.lock -# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it -# in version control. -# https://pdm.fming.dev/latest/usage/project/#working-with-version-control .pdm.toml .pdm-python .pdm-build/ @@ -161,13 +152,6 @@ dmypy.json # Cython debug symbols cython_debug/ -# PyCharm -# JetBrains specific template is maintained in a separate JetBrains.gitignore that can -# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore -# and can be added to the global gitignore or merged into this file. For a more nuclear -# option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ - # Ruff stuff: .ruff_cache/ diff --git a/pterodisbot.py b/pterodisbot.py index f6a0aa9..fe00b25 100644 --- a/pterodisbot.py +++ b/pterodisbot.py @@ -13,55 +13,52 @@ Features: - Extensive logging for all operations """ -import discord -from discord.ext import commands, tasks -from discord import app_commands -import os -import sys -import signal -import types -import aiohttp import asyncio -import json -import traceback -import logging -from logging.handlers import RotatingFileHandler import configparser +import json +import logging +import os +import signal +import sys +import types from datetime import datetime -from typing import Dict, List, Optional, Tuple +from logging.handlers import RotatingFileHandler from pathlib import Path +from typing import Dict, List, Optional, Tuple + +import aiohttp +import discord import matplotlib -matplotlib.use('Agg') # Use non-interactive backend for server environments -import matplotlib.pyplot as plt -import matplotlib.dates as mdates -from collections import deque -import io -from server_metrics_graphs import ServerMetricsGraphs, ServerMetricsManager +from discord.ext import commands, tasks + +from server_metrics_graphs import ServerMetricsManager + +matplotlib.use("Agg") # Use non-interactive backend for server environments # ============================================== # LOGGING SETUP # ============================================== -logs_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs') +logs_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs") os.makedirs(logs_dir, exist_ok=True) -logger = logging.getLogger('pterodisbot') +logger = logging.getLogger("pterodisbot") logger.setLevel(logging.DEBUG) # File handler for logs (rotates when reaching 5MB, keeps 3 backups) handler = RotatingFileHandler( - filename=os.path.join(logs_dir, 'pterodisbot.log'), - maxBytes=5*1024*1024, # 5 MiB max log file size - backupCount=3, # Rotate through 3 files - encoding='utf-8' + filename=os.path.join(logs_dir, "pterodisbot.log"), + maxBytes=5 * 1024 * 1024, # 5 MiB max log file size + backupCount=3, # Rotate through 3 files + encoding="utf-8", ) -handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) +handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) logger.addHandler(handler) # Console handler for real-time output console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) -console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) +console_handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) logger.addHandler(console_handler) logger.info("Initialized logging system with file and console output") @@ -74,86 +71,90 @@ logger.info("Initialized logging system with file and console output") # logger.debug("Gennerated config.ini file using values from .env") config = configparser.ConfigParser() -config.read('config.ini') +config.read("config.ini") # ============================================== # CONFIGURATION VALIDATION # ============================================== + class ConfigValidationError(Exception): """Custom exception for configuration validation errors.""" + pass + def validate_config(): """ Validate all required configuration values at startup. Raises ConfigValidationError if any required values are missing or invalid. """ errors = [] - + # Validate Pterodactyl section - if not config.has_section('Pterodactyl'): + if not config.has_section("Pterodactyl"): errors.append("Missing [Pterodactyl] section in config.ini") else: - required_ptero = ['PanelURL', 'ClientAPIKey', 'ApplicationAPIKey'] + required_ptero = ["PanelURL", "ClientAPIKey", "ApplicationAPIKey"] for key in required_ptero: - if not config.get('Pterodactyl', key, fallback=None): + if not config.get("Pterodactyl", key, fallback=None): errors.append(f"Missing required Pterodactyl config value: {key}") - + # Validate Discord section - if not config.has_section('Discord'): + if not config.has_section("Discord"): errors.append("Missing [Discord] section in config.ini") else: - required_discord = ['Token', 'AllowedGuildID'] + required_discord = ["Token", "AllowedGuildID"] for key in required_discord: - if not config.get('Discord', key, fallback=None): + if not config.get("Discord", key, fallback=None): errors.append(f"Missing required Discord config value: {key}") - + # Validate AllowedGuildID is a valid integer try: - guild_id = config.getint('Discord', 'AllowedGuildID', fallback=0) + guild_id = config.getint("Discord", "AllowedGuildID", fallback=0) if guild_id <= 0: errors.append("AllowedGuildID must be a positive integer") except ValueError: errors.append("AllowedGuildID must be a valid integer") - + # Validate API keys have correct prefixes - client_key = config.get('Pterodactyl', 'ClientAPIKey', fallback='') - if client_key and not client_key.startswith('ptlc_'): + client_key = config.get("Pterodactyl", "ClientAPIKey", fallback="") + if client_key and not client_key.startswith("ptlc_"): errors.append("ClientAPIKey should start with 'ptlc_'") - - app_key = config.get('Pterodactyl', 'ApplicationAPIKey', fallback='') - if app_key and not app_key.startswith('ptla_'): + + app_key = config.get("Pterodactyl", "ApplicationAPIKey", fallback="") + if app_key and not app_key.startswith("ptla_"): errors.append("ApplicationAPIKey should start with 'ptla_'") - + # Validate PanelURL is a valid URL - panel_url = config.get('Pterodactyl', 'PanelURL', fallback='') - if panel_url and not (panel_url.startswith('http://') or panel_url.startswith('https://')): + panel_url = config.get("Pterodactyl", "PanelURL", fallback="") + if panel_url and not (panel_url.startswith("http://") or panel_url.startswith("https://")): errors.append("PanelURL must start with http:// or https://") - + if errors: error_msg = "Configuration validation failed:\n- " + "\n- ".join(errors) logger.error(error_msg) raise ConfigValidationError(error_msg) - + logger.info("Configuration validation passed") + # ============================================== # CONSTANTS (Updated with validation) # ============================================== try: validate_config() - - PTERODACTYL_URL = config.get('Pterodactyl', 'PanelURL') - PTERODACTYL_CLIENT_API_KEY = config.get('Pterodactyl', 'ClientAPIKey') - PTERODACTYL_APPLICATION_API_KEY = config.get('Pterodactyl', 'ApplicationAPIKey') - DISCORD_TOKEN = config.get('Discord', 'Token') - ALLOWED_GUILD_ID = config.getint('Discord', 'AllowedGuildID') + + PTERODACTYL_URL = config.get("Pterodactyl", "PanelURL") + PTERODACTYL_CLIENT_API_KEY = config.get("Pterodactyl", "ClientAPIKey") + PTERODACTYL_APPLICATION_API_KEY = config.get("Pterodactyl", "ApplicationAPIKey") + DISCORD_TOKEN = config.get("Discord", "Token") + ALLOWED_GUILD_ID = config.getint("Discord", "AllowedGuildID") REQUIRED_ROLE = "Game Server User" UPDATE_INTERVAL = 10 EMBED_LOCATIONS_FILE = "./embed/embed_locations.json" - + logger.debug("Loaded and validated configuration values from config.ini") except ConfigValidationError as e: @@ -167,53 +168,60 @@ except Exception as e: # PTERODACTYL API CLASS # ============================================== + class PterodactylAPI: """ Handles all interactions with the Pterodactyl Panel API. Uses client API key for client endpoints and application API key for admin endpoints. Provides methods for server management and monitoring. """ - + def __init__(self, panel_url: str, client_api_key: str, application_api_key: str): """ Initialize the Pterodactyl API client with both API keys. - + Args: panel_url: URL of the Pterodactyl panel (must include protocol) client_api_key: API key for client endpoints (starts with ptlc_) application_api_key: API key for application endpoints (starts with ptla_) """ - self.panel_url = panel_url.rstrip('/') + self.panel_url = panel_url.rstrip("/") self.client_api_key = client_api_key self.application_api_key = application_api_key self.session = None self.lock = asyncio.Lock() # Prevents concurrent API access logger.info("Initialized PterodactylAPI client with provided credentials") - + async def initialize(self): """Initialize the aiohttp client session for API requests.""" self.session = aiohttp.ClientSession() logger.debug("Created new aiohttp ClientSession") - + async def close(self): """Cleanly close the aiohttp session when shutting down.""" if self.session and not self.session.closed: await self.session.close() logger.debug("Closed aiohttp ClientSession") - - async def _request(self, method: str, endpoint: str, data: Optional[dict] = None, use_application_key: bool = False) -> dict: + + async def _request( + self, + method: str, + endpoint: str, + data: Optional[dict] = None, + use_application_key: bool = False, + ) -> dict: """ Make an authenticated request to the Pterodactyl API. - + Args: method: HTTP method (GET, POST, PUT, DELETE, etc.) endpoint: API endpoint (e.g., 'application/servers') data: Optional JSON payload for POST/PUT requests use_application_key: Whether to use the application API key (admin endpoints) - + Returns: Dictionary containing API response or error information - + Raises: aiohttp.ClientError: For network-related issues json.JSONDecodeError: If response cannot be parsed as JSON @@ -221,150 +229,160 @@ class PterodactylAPI: url = f"{self.panel_url}/api/{endpoint}" api_key_type = "Application" if use_application_key else "Client" logger.debug(f"Preparing {method} request to {endpoint} using {api_key_type} API key") - + # Choose the appropriate API key api_key = self.application_api_key if use_application_key else self.client_api_key headers = { "Authorization": f"Bearer {api_key}", "Accept": "application/json", - "Content-Type": "application/json" + "Content-Type": "application/json", } - + try: async with self.lock: logger.debug(f"Acquired lock for API request to {endpoint}") - async with self.session.request( - method, - url, - headers=headers, - json=data - ) as response: + async with self.session.request(method, url, headers=headers, json=data) as response: if response.status == 204: # No content logger.debug(f"Received 204 No Content response from {endpoint}") return {"status": "success"} - + response_data = await response.json() logger.debug(f"Received response from {endpoint} with status {response.status}") - + if response.status >= 400: - error_msg = response_data.get('errors', [{}])[0].get('detail', 'Unknown error') + error_msg = response_data.get("errors", [{}])[0].get("detail", "Unknown error") logger.error(f"API request to {endpoint} failed with status {response.status}: {error_msg}") return {"status": "error", "message": error_msg} - + return response_data except Exception as e: logger.error(f"Exception during API request to {endpoint}: {str(e)}") return {"status": "error", "message": str(e)} - + async def get_servers(self) -> List[dict]: """ Get a list of all servers from the Pterodactyl panel. Uses application API key as this is an admin endpoint. - + Returns: List of server dictionaries containing all server attributes """ logger.info("Fetching list of all servers from Pterodactyl panel") response = await self._request("GET", "application/servers", use_application_key=True) - servers = response.get('data', []) + servers = response.get("data", []) logger.info(f"Retrieved {len(servers)} servers from Pterodactyl panel") return servers - + async def get_server_resources(self, server_id: str) -> dict: """ Get resource usage for a specific server. Uses client API key as this is a client endpoint. - + Args: server_id: The Pterodactyl server identifier - + Returns: Dictionary containing server resource usage and current state """ logger.debug(f"Fetching resource usage for server {server_id}") try: response = await self._request("GET", f"client/servers/{server_id}/resources") - if response.get('status') == 'error': - error_msg = response.get('message', 'Unknown error') + if response.get("status") == "error": + error_msg = response.get("message", "Unknown error") logger.error(f"Failed to get resources for server {server_id}: {error_msg}") - return {'attributes': {'current_state': 'offline'}} - - state = response.get('attributes', {}).get('current_state', 'unknown') + return {"attributes": {"current_state": "offline"}} + + state = response.get("attributes", {}).get("current_state", "unknown") logger.debug(f"Server {server_id} current state: {state}") return response except Exception as e: logger.error(f"Exception getting resources for server {server_id}: {str(e)}") - return {'attributes': {'current_state': 'offline'}} - + return {"attributes": {"current_state": "offline"}} + async def send_power_action(self, server_id: str, action: str) -> dict: """ Send a power action to a server (start/stop/restart). Uses client API key as this is a client endpoint. - + Args: server_id: The Pterodactyl server identifier action: Power action to send (start/stop/restart) - + Returns: Dictionary containing API response status """ - valid_actions = ['start', 'stop', 'restart'] + valid_actions = ["start", "stop", "restart"] if action not in valid_actions: logger.warning(f"Invalid power action attempted: {action}") - return {"status": "error", "message": f"Invalid action. Must be one of: {', '.join(valid_actions)}"} - + return { + "status": "error", + "message": f"Invalid action. Must be one of: {', '.join(valid_actions)}", + } + logger.info(f"Sending {action} command to server {server_id}") result = await self._request("POST", f"client/servers/{server_id}/power", {"signal": action}) - - if result.get('status') == 'success': + + if result.get("status") == "success": logger.info(f"Successfully executed {action} on server {server_id}") else: logger.error(f"Failed to execute {action} on server {server_id}: {result.get('message', 'Unknown error')}") - + return result - + async def get_server_details(self, server_id: str) -> dict: """ Get detailed server information including allocations. Uses application API key as this is an admin endpoint. - + Args: server_id: The Pterodactyl server identifier - + Returns: Dictionary containing detailed server information """ logger.debug(f"Fetching detailed information for server {server_id}") return await self._request("GET", f"application/servers/{server_id}", use_application_key=True) - + async def get_server_allocations(self, server_id: str) -> dict: """ Get allocation information for a server (IP addresses and ports). Uses application API key as this is an admin endpoint. - + Args: server_id: The Pterodactyl server identifier - + Returns: Dictionary containing server allocation information """ logger.debug(f"Fetching allocation information for server {server_id}") - return await self._request("GET", f"application/servers/{server_id}/allocations", use_application_key=True) + return await self._request( + "GET", + f"application/servers/{server_id}/allocations", + use_application_key=True, + ) + # ============================================== # SERVER STATUS VIEW CLASS (Buttons and UI) # ============================================== + class ServerStatusView(discord.ui.View): """ Interactive Discord view containing server control buttons. Provides persistent controls for server management with role-based access. """ - - def __init__(self, server_id: str, server_name: str, pterodactyl_api: PterodactylAPI, server_data: dict): + + def __init__( + self, + server_id: str, + server_name: str, + pterodactyl_api: PterodactylAPI, + server_data: dict, + ): """ Initialize the server status view with control buttons. - + Args: server_id: The server's Pterodactyl identifier server_name: Human-readable server name @@ -377,26 +395,23 @@ class ServerStatusView(discord.ui.View): self.api = pterodactyl_api self.server_data = server_data logger.debug(f"Created ServerStatusView for {server_name} ({server_id})") - + async def interaction_check(self, interaction: discord.Interaction) -> bool: """ Verify the interacting user has the required role and is in the allowed guild. - + Args: interaction: Discord interaction object - + Returns: bool: True if authorized, False otherwise """ # First check if interaction is from the allowed guild if interaction.guild_id != ALLOWED_GUILD_ID: logger.warning(f"Unauthorized interaction attempt from guild {interaction.guild_id}") - await interaction.response.send_message( - "This bot is only available in a specific server.", - ephemeral=True - ) + await interaction.response.send_message("This bot is only available in a specific server.", ephemeral=True) return False - + # Then check for required role logger.debug(f"Checking permissions for {interaction.user.name} on server {self.server_name}") has_role = any(role.name == REQUIRED_ROLE for role in interaction.user.roles) @@ -404,147 +419,145 @@ class ServerStatusView(discord.ui.View): logger.warning(f"Permission denied for {interaction.user.name} - missing '{REQUIRED_ROLE}' role") await interaction.response.send_message( f"You don't have permission to control servers. You need the '{REQUIRED_ROLE}' role.", - ephemeral=True + ephemeral=True, ) return False - + logger.debug(f"Permission granted for {interaction.user.name}") return True - + async def on_error(self, interaction: discord.Interaction, error: Exception, item: discord.ui.Item): """ Handle errors in button interactions. - + Args: interaction: Discord interaction object error: Exception that occurred item: The UI item that triggered the error """ logger.error(f"View error in {self.server_name} by {interaction.user.name}: {str(error)}") - await interaction.response.send_message( - "An error occurred while processing your request.", - ephemeral=True - ) - + await interaction.response.send_message("An error occurred while processing your request.", ephemeral=True) + @discord.ui.button(label="Start", style=discord.ButtonStyle.green, custom_id="start_button") async def start_button(self, interaction: discord.Interaction, button: discord.ui.Button): """Send a start command to the server.""" logger.info(f"Start button pressed for {self.server_name} by {interaction.user.name}") await interaction.response.defer(ephemeral=True) result = await self.api.send_power_action(self.server_id, "start") - - if result.get('status') == 'success': + + if result.get("status") == "success": message = f"Server '{self.server_name}' is starting..." logger.info(f"Successfully started server {self.server_name}") else: message = f"Failed to start server: {result.get('message', 'Unknown error')}" logger.error(f"Failed to start server {self.server_name}: {message}") - + await interaction.followup.send(message, ephemeral=True) - + @discord.ui.button(label="Stop", style=discord.ButtonStyle.red, custom_id="stop_button") async def stop_button(self, interaction: discord.Interaction, button: discord.ui.Button): """Send a stop command to the server.""" logger.info(f"Stop button pressed for {self.server_name} by {interaction.user.name}") await interaction.response.defer(ephemeral=True) result = await self.api.send_power_action(self.server_id, "stop") - - if result.get('status') == 'success': + + if result.get("status") == "success": message = f"Server '{self.server_name}' is stopping..." logger.info(f"Successfully stopped server {self.server_name}") else: message = f"Failed to stop server: {result.get('message', 'Unknown error')}" logger.error(f"Failed to stop server {self.server_name}: {message}") - + await interaction.followup.send(message, ephemeral=True) - + @discord.ui.button(label="Restart", style=discord.ButtonStyle.blurple, custom_id="restart_button") async def restart_button(self, interaction: discord.Interaction, button: discord.ui.Button): """Send a restart command to the server.""" logger.info(f"Restart button pressed for {self.server_name} by {interaction.user.name}") await interaction.response.defer(ephemeral=True) result = await self.api.send_power_action(self.server_id, "restart") - - if result.get('status') == 'success': + + if result.get("status") == "success": message = f"Server '{self.server_name}' is restarting..." logger.info(f"Successfully restarted server {self.server_name}") else: message = f"Failed to restart server: {result.get('message', 'Unknown error')}" logger.error(f"Failed to restart server {self.server_name}: {message}") - + await interaction.followup.send(message, ephemeral=True) - - @discord.ui.button(label="Show Address", style=discord.ButtonStyle.grey, custom_id="show_address_button") + + @discord.ui.button( + label="Show Address", + style=discord.ButtonStyle.grey, + custom_id="show_address_button", + ) async def show_address_button(self, interaction: discord.Interaction, button: discord.ui.Button): """Show server's default allocation IP and port using client API.""" logger.info(f"Show Address button pressed for {self.server_name} by {interaction.user.name}") try: await interaction.response.defer(ephemeral=True) logger.debug(f"Fetching server details for {self.server_id}") - + # Get server details using client API - server_details = await self.api._request( - "GET", - f"client/servers/{self.server_id}", - use_application_key=False - ) - - if server_details.get('status') == 'error': - error_msg = server_details.get('message', 'Unknown error') + server_details = await self.api._request("GET", f"client/servers/{self.server_id}", use_application_key=False) + + if server_details.get("status") == "error": + error_msg = server_details.get("message", "Unknown error") logger.error(f"Failed to get server details for {self.server_id}: {error_msg}") raise ValueError(error_msg) - - attributes = server_details.get('attributes', {}) - relationships = attributes.get('relationships', {}) - allocations = relationships.get('allocations', {}).get('data', []) - + + attributes = server_details.get("attributes", {}) + relationships = attributes.get("relationships", {}) + allocations = relationships.get("allocations", {}).get("data", []) + if not allocations: logger.warning(f"No allocations found for server {self.server_id}") raise ValueError("No allocations found for this server") - + # Find the default allocation (is_default=True) default_allocation = next( - (alloc for alloc in allocations - if alloc.get('attributes', {}).get('is_default', False)), - allocations[0] # Fallback to first allocation if no default found + (alloc for alloc in allocations if alloc.get("attributes", {}).get("is_default", False)), + allocations[0], # Fallback to first allocation if no default found ) - - allocation_attrs = default_allocation.get('attributes', {}) - ip_alias = allocation_attrs.get('ip_alias', 'Unknown') - port = str(allocation_attrs.get('port', 'Unknown')) - + + allocation_attrs = default_allocation.get("attributes", {}) + ip_alias = allocation_attrs.get("ip_alias", "Unknown") + port = str(allocation_attrs.get("port", "Unknown")) + logger.debug(f"Retrieved connection info for {self.server_id}: {ip_alias}:{port}") - + # Create and send embed embed = discord.Embed( title=f"{self.server_name} Connection Info", color=discord.Color.blue(), - description=f"Server ID: `{self.server_id}`" + description=f"Server ID: `{self.server_id}`", ) embed.add_field(name="Address", value=f"`{ip_alias}`", inline=True) embed.add_field(name="Port", value=f"`{port}`", inline=True) - + await interaction.followup.send(embed=embed, ephemeral=True) logger.info(f"Displayed connection info for {self.server_name}") - + except Exception as e: logger.error(f"Failed to show address for {self.server_name}: {str(e)}") await interaction.followup.send( "โš ๏ธ Failed to get connection info. The server may not have any ports allocated.", - ephemeral=True + ephemeral=True, ) + # ============================================== # MAIN BOT CLASS # ============================================== + class PterodactylBot(commands.Bot): """ Main bot class for Pterodactyl server management. Handles Discord interactions, embed management, and background tasks. Manages server status embeds and user commands. """ - + def __init__(self, *args, **kwargs): """ Initialize the Pterodactyl bot instance. @@ -555,65 +568,61 @@ class PterodactylBot(commands.Bot): self.server_cache: Dict[str, dict] = {} # Cache of server data from Pterodactyl self.embed_locations: Dict[str, Dict[str, int]] = {} # Tracks where embeds are posted self.update_lock = asyncio.Lock() # Prevents concurrent updates - self.embed_storage_path = Path(EMBED_LOCATIONS_FILE) # File to store embed - self.metrics_manager = ServerMetricsManager() # Data manager for metrics graphing system + self.embed_storage_path = Path(EMBED_LOCATIONS_FILE) # File to store embed + self.metrics_manager = ServerMetricsManager() # Data manager for metrics graphing system # Track previous server states and CPU usage to detect changes # Format: {server_id: (state, cpu_usage, last_force_update)} self.previous_states: Dict[str, Tuple[str, float, Optional[float]]] = {} logger.info("Initialized PterodactylBot instance with state tracking") - + async def setup_hook(self): """ Bot setup routine called when the bot is starting. Initializes API client, loads saved data, and starts background tasks. """ logger.info("Running bot setup hook") - + # Initialize API client - self.pterodactyl_api = PterodactylAPI( - PTERODACTYL_URL, - PTERODACTYL_CLIENT_API_KEY, - PTERODACTYL_APPLICATION_API_KEY - ) + self.pterodactyl_api = PterodactylAPI(PTERODACTYL_URL, PTERODACTYL_CLIENT_API_KEY, PTERODACTYL_APPLICATION_API_KEY) await self.pterodactyl_api.initialize() logger.info("Initialized Pterodactyl API client") - + # Load saved embed locations await self.load_embed_locations() - + # Start background update task self.update_status.start() logger.info("Started background status update task") - + async def load_embed_locations(self): """Load saved embed locations from JSON storage file.""" logger.debug("Attempting to load embed locations from storage") if not self.embed_storage_path.exists(): logger.info("No existing embed locations file found") return - + try: - with open(self.embed_storage_path, 'r') as f: + with open(self.embed_storage_path, "r") as f: self.embed_locations = json.load(f) logger.info(f"Loaded {len(self.embed_locations)} embed locations from storage") except Exception as e: logger.error(f"Failed to load embed locations: {str(e)}") - + async def save_embed_locations(self): """Save current embed locations to JSON storage file.""" logger.debug("Attempting to save embed locations to storage") try: - with open(self.embed_storage_path, 'w') as f: + with open(self.embed_storage_path, "w") as f: json.dump(self.embed_locations, f, indent=2) logger.debug("Successfully saved embed locations to disk") except Exception as e: logger.error(f"Failed to save embed locations: {str(e)}") - + async def refresh_all_embeds(self) -> Tuple[int, int]: """ Perform a complete refresh of all server status embeds. Creates new embeds and deletes old ones to prevent duplication. - + Returns: Tuple of (deleted_count, created_count) - number of embeds processed """ @@ -621,7 +630,7 @@ class PterodactylBot(commands.Bot): async with self.update_lock: try: await asyncio.sleep(1) # Initial delay - + # Get current server list if cache is empty if not self.server_cache: logger.debug("Server cache empty, fetching fresh server list") @@ -629,65 +638,65 @@ class PterodactylBot(commands.Bot): if not servers: logger.warning("No servers found in Pterodactyl panel") return 0, 0 - - self.server_cache = {server['attributes']['identifier']: server for server in servers} + + self.server_cache = {server["attributes"]["identifier"]: server for server in servers} logger.info(f"Populated server cache with {len(servers)} servers") - + # Create new embeds in temporary storage new_embeds = {} created_count = 0 skipped_count = 0 - + for server_id, server_data in self.server_cache.items(): # Skip if we don't have an existing location to recreate in if server_id not in self.embed_locations: skipped_count += 1 continue - - channel_id = self.embed_locations[server_id]['channel_id'] + + channel_id = self.embed_locations[server_id]["channel_id"] channel = self.get_channel(int(channel_id)) if not channel: logger.warning(f"Channel {channel_id} not found for server {server_id}") continue - + try: logger.debug(f"Creating new embed for server {server_id}") # Get current server status resources = await self.pterodactyl_api.get_server_resources(server_id) - + # Create new embed embed, view = await self.get_server_status_embed(server_data, resources) message = await channel.send(embed=embed, view=view) - + # Store in temporary location new_embeds[server_id] = { - 'channel_id': str(channel.id), - 'message_id': str(message.id) + "channel_id": str(channel.id), + "message_id": str(message.id), } created_count += 1 logger.info(f"Created new embed for server {server_data['attributes']['name']}") - + await asyncio.sleep(1) # Rate limit protection except Exception as e: logger.error(f"Failed to create new embed for server {server_id}: {str(e)}") - + logger.info(f"Created {created_count} new embeds, skipped {skipped_count} servers") - + # Only proceed if we created at least one new embed if not new_embeds: logger.warning("No new embeds created during refresh") return 0, 0 - + # Now delete old embeds deleted_count = 0 not_found_count = 0 - + for server_id, location in list(self.embed_locations.items()): try: - channel = self.get_channel(int(location['channel_id'])) + channel = self.get_channel(int(location["channel_id"])) if channel: try: - message = await channel.fetch_message(int(location['message_id'])) + message = await channel.fetch_message(int(location["message_id"])) await message.delete() deleted_count += 1 logger.debug(f"Deleted old embed for server {server_id}") @@ -699,96 +708,108 @@ class PterodactylBot(commands.Bot): logger.error(f"Failed to delete old embed for server {server_id}: {str(e)}") except Exception as e: logger.error(f"Error processing old embed for server {server_id}: {str(e)}") - + logger.info(f"Deleted {deleted_count} old embeds, {not_found_count} already missing") - + # Update storage with new embed locations self.embed_locations = new_embeds await self.save_embed_locations() - + return deleted_count, created_count - + except Exception as e: logger.error(f"Critical error during embed refresh: {str(e)}") raise - + async def track_new_embed(self, server_id: str, message: discord.Message): """ Track a newly created embed in storage. - + Args: server_id: The server's Pterodactyl identifier message: Discord message containing the embed """ logger.debug(f"Tracking new embed for server {server_id} in channel {message.channel.id}") self.embed_locations[server_id] = { - 'channel_id': str(message.channel.id), - 'message_id': str(message.id) + "channel_id": str(message.channel.id), + "message_id": str(message.id), } await self.save_embed_locations() - + async def get_server_status_embed(self, server_data: dict, resources: dict) -> Tuple[discord.Embed, ServerStatusView]: """ Create a status embed and view for a server. - + Args: server_data: Server information from Pterodactyl resources: Current resource usage data - + Returns: Tuple of (embed, view) objects ready for display """ - attributes = server_data.get('attributes', {}) - identifier = attributes.get('identifier', 'unknown') - name = attributes.get('name', 'Unknown Server') - description = attributes.get('description', 'No description available') + attributes = server_data.get("attributes", {}) + identifier = attributes.get("identifier", "unknown") + name = attributes.get("name", "Unknown Server") + description = attributes.get("description", "No description available") logger.debug(f"Building status embed for server {name} ({identifier})") - + # Parse resource data - resource_attributes = resources.get('attributes', {}) - current_state = resource_attributes.get('current_state', 'offline').title() - is_suspended = attributes.get('suspended', False) - + resource_attributes = resources.get("attributes", {}) + current_state = resource_attributes.get("current_state", "offline").title() + is_suspended = attributes.get("suspended", False) + # Create embed with appropriate color based on status embed = discord.Embed( title=f"{name} - {current_state}", description=description, - color=discord.Color.blue() if current_state.lower() == "running" else discord.Color.red(), - timestamp=datetime.now() + color=(discord.Color.blue() if current_state.lower() == "running" else discord.Color.red()), + timestamp=datetime.now(), ) - + embed.add_field(name="๐Ÿ†” Server ID", value=f"`{identifier}`", inline=True) - + if is_suspended: embed.add_field(name="โ„น๏ธ Status", value="โ›” `Suspended`", inline=True) else: embed.add_field(name="โ„น๏ธ Status", value="โœ… `Active`", inline=True) - + # Add resource usage if server is running if current_state.lower() != "offline": # Current usage - cpu_usage = round(resource_attributes.get('resources', {}).get('cpu_absolute', 0), 2) - memory_usage = round(resource_attributes.get('resources', {}).get('memory_bytes', 0) / (1024 ** 2), 2) - disk_usage = round(resource_attributes.get('resources', {}).get('disk_bytes', 0) / (1024 ** 2), 2) - network_rx = round(resource_attributes.get('resources', {}).get('network_rx_bytes', 0) / (1024 ** 2), 2) - network_tx = round(resource_attributes.get('resources', {}).get('network_tx_bytes', 0) / (1024 ** 2), 2) - + cpu_usage = round(resource_attributes.get("resources", {}).get("cpu_absolute", 0), 2) + memory_usage = round( + resource_attributes.get("resources", {}).get("memory_bytes", 0) / (1024**2), + 2, + ) + disk_usage = round( + resource_attributes.get("resources", {}).get("disk_bytes", 0) / (1024**2), + 2, + ) + network_rx = round( + resource_attributes.get("resources", {}).get("network_rx_bytes", 0) / (1024**2), + 2, + ) + network_tx = round( + resource_attributes.get("resources", {}).get("network_tx_bytes", 0) / (1024**2), + 2, + ) + # Maximum allocated resources from server data - limits = attributes.get('limits', {}) - cpu_limit = limits.get('cpu', 0) - memory_limit = limits.get('memory', 0) - disk_limit = limits.get('disk', 0) + limits = attributes.get("limits", {}) + cpu_limit = limits.get("cpu", 0) + memory_limit = limits.get("memory", 0) + disk_limit = limits.get("disk", 0) # Format limit values - display โˆž for unlimited (0 limit) def format_limit(value, unit=""): if value == 0: - return f"{'โˆž':<8}{unit}" # Lemniscate symbol for infinity + return f"{'โˆž':<8}]{unit}" # Lemniscate symbol for infinity else: - return f"{value:<8}{unit}" - + return f"{value:<8}]{unit}" + # Get uptime from Pterodactyl API (in milliseconds) - uptime_ms = resource_attributes.get('resources', {}).get('uptime', 0) - + uptime_ms = resource_attributes.get("resources", {}).get("uptime", 0) + # Format uptime for display if uptime_ms > 0: uptime_seconds = uptime_ms // 1000 # Convert ms to seconds @@ -806,36 +827,24 @@ class PterodactylBot(commands.Bot): uptime_text = f"`{days}d {hours}h`" else: uptime_text = "`Just started`" - + embed.add_field(name="โฑ๏ธ Uptime", value=uptime_text, inline=True) - + # Create dedicated usage text box with current usage and limits in monospace font usage_text = ( f"```properties\n" - f"CPU: {cpu_usage:>8} / {format_limit(cpu_limit, ' %')}\n" - f"Memory: {memory_usage:>8} / {format_limit(memory_limit, ' MiB')}\n" - f"Disk: {disk_usage:>8} / {format_limit(disk_limit, ' MiB')}\n" + f"CPU : [{cpu_usage:>8} / {format_limit(cpu_limit, ' %')}\n" + f"Memory : [{memory_usage:>8} / {format_limit(memory_limit, ' MiB')}\n" + f"Disk : [{disk_usage:>8} / {format_limit(disk_limit, ' MiB')}\n" f"```" ) - - embed.add_field( - name="๐Ÿ“Š Resource Usage", - value=usage_text, - inline=False - ) - - embed.add_field( - name="Network In", - value=f"๐Ÿ“ฅ `{network_rx} MiB`", - inline=True - ) - embed.add_field( - name="Network Out", - value=f"๐Ÿ“ค `{network_tx} MiB`", - inline=True - ) - + embed.add_field(name="๐Ÿ“Š Resource Usage", value=usage_text, inline=False) + + embed.add_field(name="Network In", value=f"๐Ÿ“ฅ `{network_rx} MiB`", inline=True) + + embed.add_field(name="Network Out", value=f"๐Ÿ“ค `{network_tx} MiB`", inline=True) + # Add graph images if available server_graphs = self.metrics_manager.get_server_graphs(identifier) if server_graphs and server_graphs.has_sufficient_data: @@ -845,30 +854,30 @@ class PterodactylBot(commands.Bot): f">>> `Data points: {summary['point_count']}/6`\n" f"`CPU trend: {summary['cpu_trend']} โ€ข Memory trend: {summary['memory_trend']}`" ) - + # Add a field explaining the graphs embed.add_field( name="๐Ÿ“ˆ Usage Trends (Last Minute)", value=graph_description, - inline=False + inline=False, ) - + # Set graph images (these will be attached as files in the update_status method) embed.set_image(url=f"attachment://metrics_graph_{identifier}.png") - + embed.set_footer(text="Last updated") - + # Create interactive view with control buttons view = ServerStatusView( server_id=identifier, server_name=name, pterodactyl_api=self.pterodactyl_api, - server_data=server_data + server_data=server_data, ) - + logger.debug(f"Successfully built status components for {name}") return embed, view - + @tasks.loop(seconds=UPDATE_INTERVAL) async def update_status(self): """ @@ -877,7 +886,7 @@ class PterodactylBot(commands.Bot): 2. Significant CPU usage change (>50% difference) 3. First time seeing the server 4. Server has been running for 10 minutes (force update for uptime) - + This minimizes API calls to Discord and updates while maintaining real-time awareness of important server changes. """ @@ -889,124 +898,145 @@ class PterodactylBot(commands.Bot): if not servers: logger.warning("No servers found in Pterodactyl panel during update") return - + # Update our local cache with fresh server data - self.server_cache = {server['attributes']['identifier']: server for server in servers} + self.server_cache = {server["attributes"]["identifier"]: server for server in servers} logger.debug(f"Updated server cache with {len(servers)} servers") # Clean up metrics for servers that no longer exist active_server_ids = list(self.server_cache.keys()) self.metrics_manager.cleanup_old_servers(active_server_ids) - + # Variables to track our update statistics update_count = 0 # Successful updates error_count = 0 # Failed updates missing_count = 0 # Missing embeds skipped_count = 0 # Servers that didn't need updates current_time = datetime.now().timestamp() - + # Process each server we're tracking embeds for for server_id, location in list(self.embed_locations.items()): # Skip if server no longer exists in Pterodactyl if server_id not in self.server_cache: logger.warning(f"Server {server_id} not found in cache, skipping update") continue - + server_data = self.server_cache[server_id] - server_name = server_data['attributes']['name'] - + server_name = server_data["attributes"]["name"] + try: logger.debug(f"Checking status for server {server_name} ({server_id})") - + # Get current server resource usage resources = await self.pterodactyl_api.get_server_resources(server_id) - current_state = resources.get('attributes', {}).get('current_state', 'offline') - cpu_usage = round(resources.get('attributes', {}).get('resources', {}).get('cpu_absolute', 0), 2) + current_state = resources.get("attributes", {}).get("current_state", "offline") + cpu_usage = round( + resources.get("attributes", {}).get("resources", {}).get("cpu_absolute", 0), + 2, + ) # Collect metrics data for running servers - if current_state == 'running': - memory_usage = round(resources.get('attributes', {}).get('resources', {}).get('memory_bytes', 0) / (1024 ** 2), 2) + if current_state == "running": + memory_usage = round( + resources.get("attributes", {}).get("resources", {}).get("memory_bytes", 0) / (1024**2), + 2, + ) self.metrics_manager.add_server_data(server_id, server_name, cpu_usage, memory_usage) logger.debug(f"Added metrics data for {server_name}: CPU={cpu_usage}%, Memory={memory_usage}MB") - + # Retrieve previous recorded state, CPU usage, and last force update time prev_state, prev_cpu, last_force_update = self.previous_states.get(server_id, (None, 0, None)) - + # DECISION LOGIC: Should we update the embed? needs_update = False - + # 1. Check if power state changed (most important) if current_state != prev_state: logger.debug(f"Power state changed for {server_name}: {prev_state} -> {current_state}") needs_update = True - + # 2. Check for significant CPU change (only if server is running) - elif current_state == 'running' and abs(cpu_usage - prev_cpu) > 50: + elif current_state == "running" and abs(cpu_usage - prev_cpu) > 50: logger.debug(f"Significant CPU change for {server_name}: {prev_cpu}% -> {cpu_usage}%") needs_update = True - + # 3. First time we're seeing this server (initial update) elif prev_state is None: logger.debug(f"First check for {server_name}, performing initial update") needs_update = True - + # 4. Force update every 10 minutes for running servers (for uptime counter) - elif (current_state == 'running' and - (last_force_update is None or - current_time - last_force_update >= 600)): # 10 minutes = 600 seconds + elif current_state == "running" and ( + last_force_update is None or current_time - last_force_update >= 600 + ): # 10 minutes = 600 seconds logger.debug(f"Executing 10-minute force update for running server {server_name}") needs_update = True # Update the last force update time last_force_update = current_time - + # PERFORM UPDATE IF NEEDED if needs_update: # Generate fresh embed and view components embed, view = await self.get_server_status_embed(server_data, resources) - + # Get the channel where this server's embed lives - channel = self.get_channel(int(location['channel_id'])) + channel = self.get_channel(int(location["channel_id"])) if not channel: logger.warning(f"Channel {location['channel_id']} not found for server {server_id}") continue - + # Fetch and update the existing message - message = await channel.fetch_message(int(location['message_id'])) - + message = await channel.fetch_message(int(location["message_id"])) + # Check if server is transitioning to offline/stopping state # and remove image attachment if present files = [] server_graphs = self.metrics_manager.get_server_graphs(server_id) - + # Only include graph images if server is running AND has sufficient data - if (current_state == 'running' and - server_graphs and - server_graphs.has_sufficient_data): + if current_state == "running" and server_graphs and server_graphs.has_sufficient_data: # Generate metrics graph combined_graph = server_graphs.generate_combined_graph() if combined_graph: - files.append(discord.File(combined_graph, filename=f"metrics_graph_{server_id}.png")) + files.append( + discord.File( + combined_graph, + filename=f"metrics_graph_{server_id}.png", + ) + ) logger.debug(f"Including metrics graph for running server {server_name}") else: # Server is offline/stopping - ensure no image is attached logger.debug(f"Server {server_name} is {current_state}, removing image attachment if present") # We'll update without files to remove any existing attachments - + # Update message with embed, view, and files (empty files list removes attachments) await message.edit(embed=embed, view=view, attachments=files) update_count += 1 logger.debug(f"Updated status for {server_name}") - + # Update our state tracking with new values # Only update last_force_update if this was a force update - new_last_force_update = last_force_update if needs_update and current_state == 'running' and current_time - (last_force_update or 0) >= 600 else (last_force_update if last_force_update is not None else None) - self.previous_states[server_id] = (current_state, cpu_usage, new_last_force_update) + new_last_force_update = ( + last_force_update + if needs_update and current_state == "running" and current_time - (last_force_update or 0) >= 600 + else (last_force_update if last_force_update is not None else None) + ) + self.previous_states[server_id] = ( + current_state, + cpu_usage, + new_last_force_update, + ) else: # No significant changes detected, but update tracking with current state - self.previous_states[server_id] = (current_state, cpu_usage, last_force_update) + self.previous_states[server_id] = ( + current_state, + cpu_usage, + last_force_update, + ) skipped_count += 1 logger.debug(f"No changes detected for {server_name}, skipping update") - + except discord.NotFound: # Embed message was deleted - clean up our tracking logger.warning(f"Embed for server {server_id} not found, removing from tracking") @@ -1017,10 +1047,10 @@ class PterodactylBot(commands.Bot): except Exception as e: logger.error(f"Failed to update status for server {server_id}: {str(e)}") error_count += 1 - + # Small delay between servers to avoid rate limits await asyncio.sleep(0.5) - + # Log summary of this update cycle logger.info( f"Update cycle complete: " @@ -1029,19 +1059,19 @@ class PterodactylBot(commands.Bot): f"{missing_count} missing, " f"{error_count} errors" ) - + except Exception as e: logger.error(f"Error in update_status task: {str(e)}") # If something went wrong, wait before retrying await asyncio.sleep(5) - + @update_status.before_loop async def before_update_status(self): """Wait for bot to be ready before starting update task.""" logger.debug("Waiting for bot readiness before starting update task") await self.wait_until_ready() await self.refresh_all_embeds() - + @update_status.after_loop async def after_update_status(self): """Handle update task stopping.""" @@ -1049,7 +1079,7 @@ class PterodactylBot(commands.Bot): logger.info("Server status update task was cancelled") elif self.update_status.failed(): logger.error("Server status update task failed") - + async def close(self): """Cleanup when bot is shutting down.""" logger.info("Bot shutdown initiated - performing cleanup") @@ -1058,6 +1088,7 @@ class PterodactylBot(commands.Bot): await self.pterodactyl_api.close() await super().close() + # ============================================== # DISCORD COMMANDS # ============================================== @@ -1067,37 +1098,36 @@ intents.message_content = True bot = PterodactylBot(command_prefix="!", intents=intents) + async def check_allowed_guild(interaction: discord.Interaction) -> bool: """ Verify that an interaction is coming from the allowed guild. - + Args: interaction: Discord interaction object - + Returns: bool: True if interaction is allowed, False otherwise """ if interaction.guild_id != ALLOWED_GUILD_ID: logger.warning(f"Command attempted from unauthorized guild {interaction.guild_id} by {interaction.user.name}") - await interaction.response.send_message( - "This bot is only available in a specific server.", - ephemeral=True - ) + await interaction.response.send_message("This bot is only available in a specific server.", ephemeral=True) return False return True + @bot.tree.command(name="server_status", description="Get a list of available game servers to control") async def server_status(interaction: discord.Interaction): """ Slash command to display server status dashboard with interactive dropdown selection. - + This command provides a comprehensive server management interface by: 1. Fetching current server list from Pterodactyl panel 2. Generating real-time statistics (online/offline counts) 3. Displaying an informational embed with server statistics 4. Presenting an ephemeral dropdown menu with all available servers 5. Handling server selection to create permanent status embeds in the channel - + Workflow: - Validates guild permissions and defers ephemeral response - Refreshes server cache from Pterodactyl API @@ -1107,32 +1137,32 @@ async def server_status(interaction: discord.Interaction): - Handles user selection via ephemeral dropdown interaction - Creates permanent status embed in channel upon selection - Manages embed tracking and cleanup of previous embeds - + Ephemeral Design: - Initial dashboard and dropdown are ephemeral (visible only to user) - Automatically disappears after use or timeout (3 minutes) - No manual cleanup required for dropdown interface - Only final server status embed is posted publicly - + Error Handling: - Handles API failures during server enumeration - Manages missing servers between selection and execution - Provides user-friendly error messages for all failure scenarios - Maintains comprehensive logging for troubleshooting - + Args: interaction: Discord interaction object representing the command invocation - + Returns: None: Sends ephemeral dashboard with dropdown, then public status embed on selection """ # Check if interaction is from allowed guild if not await check_allowed_guild(interaction): return - + logger.info(f"Server status command invoked by {interaction.user.name}") await interaction.response.defer(ephemeral=True) - + try: # Refresh server cache with current data from Pterodactyl panel servers = await bot.pterodactyl_api.get_servers() @@ -1140,77 +1170,69 @@ async def server_status(interaction: discord.Interaction): logger.warning("No servers found in Pterodactyl panel") await interaction.followup.send("No servers found in the Pterodactyl panel.", ephemeral=True) return - - bot.server_cache = {server['attributes']['identifier']: server for server in servers} + + bot.server_cache = {server["attributes"]["identifier"]: server for server in servers} logger.debug(f"Refreshed server cache with {len(servers)} servers") - + # Count online/offline servers by checking each server's current state online_count = 0 offline_count = 0 - + # Check status for each server to generate accurate statistics for server_id, server_data in bot.server_cache.items(): resources = await bot.pterodactyl_api.get_server_resources(server_id) - current_state = resources.get('attributes', {}).get('current_state', 'offline') - - if current_state == 'running': + current_state = resources.get("attributes", {}).get("current_state", "offline") + + if current_state == "running": online_count += 1 else: offline_count += 1 - + # Create statistics embed with visual server status breakdown stats_embed = discord.Embed( title="๐Ÿ—๏ธ Server Status Dashboard", description="Select a server from the dropdown below to view its detailed status and controls.", color=discord.Color.blue(), - timestamp=datetime.now() + timestamp=datetime.now(), ) - + stats_embed.add_field( name="๐Ÿ“Š Server Statistics", - value=f"**Total Servers:** {len(servers)}\n" - f"โœ… **Online:** {online_count}\n" - f"โŒ **Offline:** {offline_count}", - inline=False + value=f"**Total Servers:** {len(servers)}\n" f"โœ… **Online:** {online_count}\n" f"โŒ **Offline:** {offline_count}", + inline=False, ) - + stats_embed.add_field( name="โ„น๏ธ How to Use", value="Use the dropdown menu below to select a server. The server's status embed will be posted in this channel.", - inline=False + inline=False, ) - + stats_embed.set_footer(text="Server status will update automatically") - + # Create dropdown menu options from available servers server_options = [] for server_id, server_data in bot.server_cache.items(): - server_name = server_data['attributes']['name'] - server_description = server_data['attributes'].get('description', 'No description') - + server_name = server_data["attributes"]["name"] + server_description = server_data["attributes"].get("description", "No description") + # Truncate description if too long for dropdown constraints if len(server_description) > 50: server_description = server_description[:47] + "..." - - server_options.append( - discord.SelectOption( - label=server_name, - value=server_id, - description=server_description - ) - ) - + + server_options.append(discord.SelectOption(label=server_name, value=server_id, description=server_description)) + # Create dropdown view with timeout for automatic cleanup class ServerDropdownView(discord.ui.View): def __init__(self, server_options, timeout=180): # 3 minute timeout for ephemeral cleanup super().__init__(timeout=timeout) self.server_options = server_options self.add_item(ServerDropdown(server_options)) - + async def on_timeout(self): # Clean up when dropdown times out (ephemeral auto-removal) logger.debug("Server dropdown timed out and was automatically cleaned up") - + # Dropdown selection handler for server choice class ServerDropdown(discord.ui.Select): def __init__(self, server_options): @@ -1218,129 +1240,121 @@ async def server_status(interaction: discord.Interaction): placeholder="Select a server to display...", options=server_options, min_values=1, - max_values=1 + max_values=1, ) - + async def callback(self, interaction: discord.Interaction): """ Handle server selection from dropdown menu. Creates permanent status embed in the channel for the selected server. """ await interaction.response.defer(ephemeral=True) - + selected_server_id = self.values[0] server_data = bot.server_cache.get(selected_server_id) - + if not server_data: await interaction.followup.send( "โŒ Selected server no longer available. Please try again.", - ephemeral=True + ephemeral=True, ) return - - server_name = server_data['attributes']['name'] + + server_name = server_data["attributes"]["name"] logger.info(f"User {interaction.user.name} selected server: {server_name}") - + try: # Get current server status for embed creation resources = await bot.pterodactyl_api.get_server_resources(selected_server_id) - + # Delete old embed if it exists to prevent duplication if selected_server_id in bot.embed_locations: logger.debug(f"Found existing embed for {selected_server_id}, attempting to delete") try: old_location = bot.embed_locations[selected_server_id] - old_channel = bot.get_channel(int(old_location['channel_id'])) + old_channel = bot.get_channel(int(old_location["channel_id"])) if old_channel: try: - old_message = await old_channel.fetch_message(int(old_location['message_id'])) + old_message = await old_channel.fetch_message(int(old_location["message_id"])) await old_message.delete() logger.debug(f"Deleted old embed for {selected_server_id}") except discord.NotFound: logger.debug(f"Old embed for {selected_server_id} already deleted") except Exception as e: logger.error(f"Failed to delete old embed: {str(e)}") - + # Create and send new permanent status embed in channel embed, view = await bot.get_server_status_embed(server_data, resources) message = await interaction.channel.send(embed=embed, view=view) await bot.track_new_embed(selected_server_id, message) - + await interaction.followup.send( f"โœ… **{server_name}** status has been posted in {interaction.channel.mention}", - ephemeral=True + ephemeral=True, ) logger.info(f"Successfully posted status for {server_name}") - + except Exception as e: logger.error(f"Failed to create status embed for {server_name}: {str(e)}") await interaction.followup.send( f"โŒ Failed to create status embed for **{server_name}**: {str(e)}", - ephemeral=True + ephemeral=True, ) - + # Send the initial dashboard embed with dropdown (ephemeral - auto-cleaned) - await interaction.followup.send( - embed=stats_embed, - view=ServerDropdownView(server_options), - ephemeral=True - ) + await interaction.followup.send(embed=stats_embed, view=ServerDropdownView(server_options), ephemeral=True) logger.info(f"Sent server status dashboard to {interaction.user.name} with {len(server_options)} servers") - + except Exception as e: logger.error(f"Server status command failed: {str(e)}") - await interaction.followup.send( - f"โŒ Failed to load server status: {str(e)}", - ephemeral=True - ) + await interaction.followup.send(f"โŒ Failed to load server status: {str(e)}", ephemeral=True) + @bot.tree.command(name="refresh_embeds", description="Refresh all server status embeds (admin only)") async def refresh_embeds(interaction: discord.Interaction): """Slash command to refresh all server embeds.""" if not await check_allowed_guild(interaction): return - + logger.info(f"Refresh embeds command invoked by {interaction.user.name}") await interaction.response.defer(ephemeral=True) - + # Require administrator permissions if not interaction.user.guild_permissions.administrator: logger.warning(f"Unauthorized refresh attempt by {interaction.user.name}") - await interaction.followup.send( - "You need administrator permissions to refresh all embeds.", - ephemeral=True - ) + await interaction.followup.send("You need administrator permissions to refresh all embeds.", ephemeral=True) return - + try: logger.info("Starting full embed refresh per admin request") deleted, created = await bot.refresh_all_embeds() await interaction.followup.send( f"Refreshed all embeds. Deleted {deleted} old embeds, created {created} new ones.", - ephemeral=True + ephemeral=True, ) logger.info(f"Embed refresh completed: {deleted} deleted, {created} created") except Exception as e: logger.error(f"Embed refresh failed: {str(e)}") - await interaction.followup.send( - f"Failed to refresh embeds: {str(e)}", - ephemeral=True - ) + await interaction.followup.send(f"Failed to refresh embeds: {str(e)}", ephemeral=True) -@bot.tree.command(name="purge_embeds", description="Permanently delete all server status embeds (admin only)") + +@bot.tree.command( + name="purge_embeds", + description="Permanently delete all server status embeds (admin only)", +) async def purge_embeds(interaction: discord.Interaction): """ Slash command to permanently purge all server status embeds from Discord channels. - + This command performs a complete cleanup of all tracked server status embeds by: 1. Iterating through all tracked embed locations in embed_locations.json 2. Attempting to delete each embed message from its respective Discord channel 3. Clearing the embed tracking file and internal state tracking 4. Providing real-time progress updates during the operation - + Args: interaction: Discord interaction object representing the command invocation - + Workflow: - Validates administrator permissions - Checks if any embeds are currently tracked @@ -1349,67 +1363,64 @@ async def purge_embeds(interaction: discord.Interaction): - Updates progress embed in real-time - Saves cleared tracking data to disk - Sends final results with comprehensive statistics - + Safety: - Only affects tracked embeds (won't delete arbitrary messages) - Maintains logs for audit purposes - Provides rollback protection through immediate tracking removal - Includes rate limiting to avoid Discord API limits - + Returns: None: Sends follow-up messages with operation results """ if not await check_allowed_guild(interaction): return - + logger.info(f"Purge embeds command invoked by {interaction.user.name}") await interaction.response.defer(ephemeral=True) - + # Require administrator permissions if not interaction.user.guild_permissions.administrator: logger.warning(f"Unauthorized purge attempt by {interaction.user.name}") - await interaction.followup.send( - "You need administrator permissions to purge all embeds.", - ephemeral=True - ) + await interaction.followup.send("You need administrator permissions to purge all embeds.", ephemeral=True) return - + try: logger.info("Starting embed purge per admin request") - + # Variables to track purge statistics deleted_count = 0 not_found_count = 0 error_count = 0 total_embeds = len(bot.embed_locations) - + if total_embeds == 0: await interaction.followup.send( "No embeds are currently being tracked. Nothing to purge.", - ephemeral=True + ephemeral=True, ) return - + # Create progress embed progress_embed = discord.Embed( title="๐Ÿ”„ Purging Server Embeds", description=f"Processing {total_embeds} embeds...", - color=discord.Color.orange() + color=discord.Color.orange(), ) progress_embed.add_field(name="Deleted", value="0", inline=True) progress_embed.add_field(name="Not Found", value="0", inline=True) progress_embed.add_field(name="Errors", value="0", inline=True) progress_embed.set_footer(text="This may take a while...") - + progress_message = await interaction.followup.send(embed=progress_embed, ephemeral=True) - + # Process each tracked embed for server_id, location in list(bot.embed_locations.items()): try: - channel = bot.get_channel(int(location['channel_id'])) + channel = bot.get_channel(int(location["channel_id"])) if channel: try: - message = await channel.fetch_message(int(location['message_id'])) + message = await channel.fetch_message(int(location["message_id"])) await message.delete() deleted_count += 1 logger.debug(f"Successfully purged embed for server {server_id}") @@ -1425,77 +1436,77 @@ async def purge_embeds(interaction: discord.Interaction): else: not_found_count += 1 logger.warning(f"Channel not found for server {server_id}") - + # Remove from tracking immediately bot.embed_locations.pop(server_id, None) bot.previous_states.pop(server_id, None) # Also clean up state tracking - + # Update progress every 5 embeds or for the last one - if (deleted_count + not_found_count + error_count) % 5 == 0 or \ - (deleted_count + not_found_count + error_count) == total_embeds: - + if (deleted_count + not_found_count + error_count) % 5 == 0 or ( + deleted_count + not_found_count + error_count + ) == total_embeds: + progress_embed.description = f"Processed {deleted_count + not_found_count + error_count}/{total_embeds} embeds" progress_embed.set_field_at(0, name="Deleted", value=str(deleted_count), inline=True) progress_embed.set_field_at(1, name="Not Found", value=str(not_found_count), inline=True) progress_embed.set_field_at(2, name="Errors", value=str(error_count), inline=True) - + await progress_message.edit(embed=progress_embed) - + # Small delay to avoid rate limits await asyncio.sleep(0.3) - + except Exception as e: error_count += 1 logger.error(f"Unexpected error processing server {server_id}: {str(e)}") - + # Save the cleared embed locations await bot.save_embed_locations() - + # Create results embed result_embed = discord.Embed( title="โœ… Embed Purge Complete", color=discord.Color.green(), - timestamp=datetime.now() + timestamp=datetime.now(), ) result_embed.add_field(name="Total Tracked", value=str(total_embeds), inline=True) result_embed.add_field(name="โœ… Successfully Deleted", value=str(deleted_count), inline=True) result_embed.add_field(name="โŒ Already Missing", value=str(not_found_count), inline=True) result_embed.add_field(name="โš ๏ธ Errors", value=str(error_count), inline=True) - result_embed.add_field(name="๐Ÿ“Š Success Rate", - value=f"{((deleted_count + not_found_count) / total_embeds * 100):.1f}%", - inline=True) + result_embed.add_field( + name="๐Ÿ“Š Success Rate", + value=f"{((deleted_count + not_found_count) / total_embeds * 100):.1f}%", + inline=True, + ) result_embed.set_footer(text="Embed tracking file has been cleared") - + await progress_message.edit(embed=result_embed) logger.info(f"Embed purge completed: {deleted_count} deleted, {not_found_count} not found, {error_count} errors") - + except Exception as e: logger.error(f"Embed purge failed: {str(e)}") - await interaction.followup.send( - f"โŒ Failed to purge embeds: {str(e)}", - ephemeral=True - ) + await interaction.followup.send(f"โŒ Failed to purge embeds: {str(e)}", ephemeral=True) + # ============================================== # BOT EVENTS # ============================================== + @bot.event async def on_interaction(interaction: discord.Interaction): """Global interaction handler to check guild before processing any interaction.""" if interaction.guild_id != ALLOWED_GUILD_ID: logger.debug(f"Ignoring interaction from unauthorized guild {interaction.guild_id}") - await interaction.response.send_message( - "This bot is only available in a specific server.", - ephemeral=True - ) + await interaction.response.send_message("This bot is only available in a specific server.", ephemeral=True) return + @bot.event async def on_ready(): """Called when the bot successfully connects to Discord.""" logger.info(f"Bot connected as {bot.user.name} (ID: {bot.user.id})") - + try: # Sync commands only to the allowed guild guild = discord.Object(id=ALLOWED_GUILD_ID) @@ -1505,14 +1516,16 @@ async def on_ready(): except Exception as e: logger.error(f"Command sync failed: {str(e)}") + # ============================================== # SYSTEM SIGNAL HANDLERS # ============================================== + def handle_sigint(signum: int, frame: types.FrameType) -> None: """ Handle SIGINT signals (Ctrl+C) by initiating graceful shutdown. - + Args: signum: The signal number (signal.SIGINT) frame: Current stack frame (unused but required by signal handler signature) @@ -1520,10 +1533,11 @@ def handle_sigint(signum: int, frame: types.FrameType) -> None: logger.info("Received SIGINT (Ctrl+C), initiating graceful shutdown...") raise KeyboardInterrupt + def handle_sigterm(signum: int, frame: types.FrameType) -> None: """ Handle SIGTERM signals (container stop) by initiating graceful shutdown. - + Args: signum: The signal number (signal.SIGTERM) frame: Current stack frame (unused but required by signal handler signature) @@ -1531,6 +1545,7 @@ def handle_sigterm(signum: int, frame: types.FrameType) -> None: logger.info("Received SIGTERM (container stop), initiating graceful shutdown...") raise KeyboardInterrupt + # ============================================== # BOT STARTUP # ============================================== @@ -1538,13 +1553,13 @@ def handle_sigterm(signum: int, frame: types.FrameType) -> None: if __name__ == "__main__": """ Main entry point for the bot application. - + Handles: - Signal registration for graceful shutdowns (SIGINT/SIGTERM) - Primary bot execution loop - Error handling and crash reporting - Resource cleanup on shutdown - + Flow: 1. Initialize signal handlers 2. Start bot with Discord token @@ -1554,7 +1569,7 @@ if __name__ == "__main__": logger.info("Starting bot initialization") # Register signal handlers - signal.signal(signal.SIGINT, handle_sigint) # For Ctrl+C + signal.signal(signal.SIGINT, handle_sigint) # For Ctrl+C signal.signal(signal.SIGTERM, handle_sigterm) # For container stop commands logger.info("System signal handlers registered") diff --git a/requirements-test.txt b/requirements-test.txt new file mode 100644 index 0000000..8ef58cc --- /dev/null +++ b/requirements-test.txt @@ -0,0 +1,26 @@ +# Testing Dependencies for Pterodactyl Discord Bot + +# Core testing framework +pytest>=7.4.0 +pytest-asyncio>=0.21.0 +pytest-cov>=4.1.0 +pytest-mock>=3.11.1 +pytest-timeout>=2.1.0 + +# Code quality and linting +flake8>=6.0.0 +pylint>=2.17.0 +black>=23.7.0 +isort>=5.12.0 + +# Security scanning +bandit>=1.7.5 +safety>=2.3.5 + +# Mocking and fixtures +pytest-fixtures>=0.1.0 +freezegun>=1.2.2 + +# Coverage reporting +coverage>=7.2.7 +coverage-badge>=1.1.0 \ No newline at end of file diff --git a/server_metrics_graphs.py b/server_metrics_graphs.py index d83cafa..8d6e89b 100644 --- a/server_metrics_graphs.py +++ b/server_metrics_graphs.py @@ -5,24 +5,27 @@ This module provides graphing capabilities for server CPU and memory usage. Generates line graphs as PNG images for embedding in Discord messages. """ -import matplotlib -matplotlib.use('Agg') # Use non-interactive backend for server environments -import matplotlib.pyplot as plt -import matplotlib.dates as mdates -from collections import deque -from datetime import datetime, timedelta -from typing import Dict, Tuple, Optional import io import logging import math +from collections import deque +from datetime import datetime +from typing import Dict, Optional + +import matplotlib +import matplotlib.dates as mdates +import matplotlib.pyplot as plt + +matplotlib.use("Agg") # Use non-interactive backend for server environments # Get the logger from the main bot module -logger = logging.getLogger('pterodisbot') +logger = logging.getLogger("pterodisbot") + class ServerMetricsGraphs: """ Manages CPU and memory usage graphs for individual servers. - + Features: - Stores last 6 data points (1 minute of history at 10-second intervals) - Generates PNG images of line graphs for Discord embedding @@ -31,31 +34,31 @@ class ServerMetricsGraphs: - Dynamic CPU scaling in 100% increments for multi-vCPU servers - Clean graph styling optimized for Discord dark theme """ - + def __init__(self, server_id: str, server_name: str): """ Initialize metrics tracking for a server. - + Args: server_id: Pterodactyl server identifier server_name: Human-readable server name """ self.server_id = server_id self.server_name = server_name - + # Use deque with maxlen=6 for automatic FIFO rotation # Each entry is a tuple: (timestamp, cpu_percent, memory_mb) self.data_points = deque(maxlen=6) - + # Track if we have enough data for meaningful graphs (at least 2 points) self.has_sufficient_data = False - + logger.debug(f"Initialized metrics tracking for server {server_name} ({server_id})") - + def add_data_point(self, cpu_percent: float, memory_mb: float, timestamp: Optional[datetime] = None): """ Add a new data point to the metrics history. - + Args: cpu_percent: Current CPU usage percentage memory_mb: Current memory usage in megabytes @@ -63,351 +66,404 @@ class ServerMetricsGraphs: """ if timestamp is None: timestamp = datetime.now() - + # Add new data point (automatically rotates old data due to maxlen=6) self.data_points.append((timestamp, cpu_percent, memory_mb)) - + # Update sufficient data flag self.has_sufficient_data = len(self.data_points) >= 2 - + logger.debug(f"Added metrics data point for {self.server_name}: CPU={cpu_percent}%, Memory={memory_mb}MB") - + def _calculate_cpu_scale_limit(self, max_cpu_value: float) -> int: """ Calculate appropriate CPU scale limit in 100% increments. - + Args: max_cpu_value: Maximum CPU value in the dataset - + Returns: Scale limit rounded up to nearest 100% increment """ if max_cpu_value <= 100: return 100 - + # Round up to nearest 100% increment # e.g., 150% -> 200%, 250% -> 300%, 350% -> 400% return math.ceil(max_cpu_value / 100) * 100 - + def generate_cpu_graph(self) -> Optional[io.BytesIO]: """ Generate a CPU usage line graph as a PNG image. - + Returns: BytesIO object containing PNG image data, or None if insufficient data """ if not self.has_sufficient_data: logger.debug(f"Insufficient data for CPU graph generation: {self.server_name}") return None - + try: # Extract timestamps and CPU data timestamps = [point[0] for point in self.data_points] cpu_values = [point[1] for point in self.data_points] - + # Calculate dynamic CPU scale limit max_cpu = max(cpu_values) cpu_scale_limit = self._calculate_cpu_scale_limit(max_cpu) - + # Create figure with dark theme styling - plt.style.use('dark_background') + plt.style.use("dark_background") fig, ax = plt.subplots(figsize=(8, 4), dpi=100) - fig.patch.set_facecolor('#2f3136') # Discord dark theme background - ax.set_facecolor('#36393f') # Slightly lighter for graph area - + fig.patch.set_facecolor("#2f3136") # Discord dark theme background + ax.set_facecolor("#36393f") # Slightly lighter for graph area + # Plot CPU line with gradient fill - line = ax.plot(timestamps, cpu_values, color='#7289da', linewidth=2.5, marker='o', markersize=4) - ax.fill_between(timestamps, cpu_values, alpha=0.3, color='#7289da') - + ax.fill_between(timestamps, cpu_values, alpha=0.3, color="#7289da") + # Customize axes with dynamic scaling - ax.set_ylabel('CPU Usage (%)', color='#ffffff', fontsize=10) + ax.set_ylabel("CPU Usage (%)", color="#ffffff", fontsize=10) ax.set_ylim(0, cpu_scale_limit) - + # Add horizontal grid lines at 100% increments for better readability for i in range(100, cpu_scale_limit + 1, 100): - ax.axhline(y=i, color='#ffffff', alpha=0.2, linestyle='--', linewidth=0.8) - + ax.axhline(y=i, color="#ffffff", alpha=0.2, linestyle="--", linewidth=0.8) + # Format time axis - ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S')) + ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M:%S")) ax.xaxis.set_major_locator(mdates.SecondLocator(interval=20)) - plt.setp(ax.xaxis.get_majorticklabels(), rotation=45, ha='right', color='#ffffff', fontsize=8) - + plt.setp( + ax.xaxis.get_majorticklabels(), + rotation=45, + ha="right", + color="#ffffff", + fontsize=8, + ) + # Style the graph - ax.tick_params(colors='#ffffff', labelsize=8) - ax.grid(True, alpha=0.3, color='#ffffff') - ax.spines['bottom'].set_color('#ffffff') - ax.spines['left'].set_color('#ffffff') - ax.spines['top'].set_visible(False) - ax.spines['right'].set_visible(False) - + ax.tick_params(colors="#ffffff", labelsize=8) + ax.grid(True, alpha=0.3, color="#ffffff") + ax.spines["bottom"].set_color("#ffffff") + ax.spines["left"].set_color("#ffffff") + ax.spines["top"].set_visible(False) + ax.spines["right"].set_visible(False) + # Add title with scale info for multi-vCPU servers - title = f'{self.server_name} - CPU Usage' + title = f"{self.server_name} - CPU Usage" if cpu_scale_limit > 100: estimated_vcpus = cpu_scale_limit // 100 - title += f' (~{estimated_vcpus} vCPU cores)' - ax.set_title(title, color='#ffffff', fontsize=12, pad=20) - + title += f" (~{estimated_vcpus} vCPU cores)" + ax.set_title(title, color="#ffffff", fontsize=12, pad=20) + # Tight layout to prevent label cutoff plt.tight_layout() - + # Save to BytesIO img_buffer = io.BytesIO() - plt.savefig(img_buffer, format='png', facecolor='#2f3136', edgecolor='none', - bbox_inches='tight', dpi=100) + plt.savefig( + img_buffer, + format="png", + facecolor="#2f3136", + edgecolor="none", + bbox_inches="tight", + dpi=100, + ) img_buffer.seek(0) - + # Clean up matplotlib resources plt.close(fig) - + logger.debug(f"Generated CPU graph for {self.server_name} (scale: 0-{cpu_scale_limit}%)") return img_buffer - + except Exception as e: logger.error(f"Failed to generate CPU graph for {self.server_name}: {str(e)}") - plt.close('all') # Clean up any remaining figures + plt.close("all") # Clean up any remaining figures return None - + def generate_memory_graph(self) -> Optional[io.BytesIO]: """ Generate a memory usage line graph as a PNG image. - + Returns: BytesIO object containing PNG image data, or None if insufficient data """ if not self.has_sufficient_data: logger.debug(f"Insufficient data for memory graph generation: {self.server_name}") return None - + try: # Extract timestamps and memory data timestamps = [point[0] for point in self.data_points] memory_values = [point[2] for point in self.data_points] - + # Create figure with dark theme styling - plt.style.use('dark_background') + plt.style.use("dark_background") fig, ax = plt.subplots(figsize=(8, 4), dpi=100) - fig.patch.set_facecolor('#2f3136') # Discord dark theme background - ax.set_facecolor('#36393f') # Slightly lighter for graph area - + fig.patch.set_facecolor("#2f3136") # Discord dark theme background + ax.set_facecolor("#36393f") # Slightly lighter for graph area + # Plot memory line with gradient fill - line = ax.plot(timestamps, memory_values, color='#43b581', linewidth=2.5, marker='o', markersize=4) - ax.fill_between(timestamps, memory_values, alpha=0.3, color='#43b581') - + ax.fill_between(timestamps, memory_values, alpha=0.3, color="#43b581") + # Customize axes - ax.set_ylabel('Memory Usage (MB)', color='#ffffff', fontsize=10) + ax.set_ylabel("Memory Usage (MB)", color="#ffffff", fontsize=10) ax.set_ylim(0, max(memory_values) * 1.1) # Dynamic scaling with 10% padding - + # Format time axis - ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S')) + ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M:%S")) ax.xaxis.set_major_locator(mdates.SecondLocator(interval=20)) - plt.setp(ax.xaxis.get_majorticklabels(), rotation=45, ha='right', color='#ffffff', fontsize=8) - + plt.setp( + ax.xaxis.get_majorticklabels(), + rotation=45, + ha="right", + color="#ffffff", + fontsize=8, + ) + # Style the graph - ax.tick_params(colors='#ffffff', labelsize=8) - ax.grid(True, alpha=0.3, color='#ffffff') - ax.spines['bottom'].set_color('#ffffff') - ax.spines['left'].set_color('#ffffff') - ax.spines['top'].set_visible(False) - ax.spines['right'].set_visible(False) - + ax.tick_params(colors="#ffffff", labelsize=8) + ax.grid(True, alpha=0.3, color="#ffffff") + ax.spines["bottom"].set_color("#ffffff") + ax.spines["left"].set_color("#ffffff") + ax.spines["top"].set_visible(False) + ax.spines["right"].set_visible(False) + # Add title - ax.set_title(f'{self.server_name} - Memory Usage', color='#ffffff', fontsize=12, pad=20) - + ax.set_title( + f"{self.server_name} - Memory Usage", + color="#ffffff", + fontsize=12, + pad=20, + ) + # Tight layout to prevent label cutoff plt.tight_layout() - + # Save to BytesIO img_buffer = io.BytesIO() - plt.savefig(img_buffer, format='png', facecolor='#2f3136', edgecolor='none', - bbox_inches='tight', dpi=100) + plt.savefig( + img_buffer, + format="png", + facecolor="#2f3136", + edgecolor="none", + bbox_inches="tight", + dpi=100, + ) img_buffer.seek(0) - + # Clean up matplotlib resources plt.close(fig) - + logger.debug(f"Generated memory graph for {self.server_name}") return img_buffer - + except Exception as e: logger.error(f"Failed to generate memory graph for {self.server_name}: {str(e)}") - plt.close('all') # Clean up any remaining figures + plt.close("all") # Clean up any remaining figures return None def generate_combined_graph(self) -> Optional[io.BytesIO]: """ Generate a combined CPU and memory usage graph as a PNG image. - + Returns: BytesIO object containing PNG image data, or None if insufficient data """ if not self.has_sufficient_data: logger.debug(f"Insufficient data for combined graph generation: {self.server_name}") return None - + try: # Extract data timestamps = [point[0] for point in self.data_points] cpu_values = [point[1] for point in self.data_points] memory_values = [point[2] for point in self.data_points] - + # Calculate dynamic CPU scale limit max_cpu = max(cpu_values) cpu_scale_limit = self._calculate_cpu_scale_limit(max_cpu) - + # Create figure with two subplots - plt.style.use('dark_background') + plt.style.use("dark_background") fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(8, 6), dpi=100, sharex=True) - fig.patch.set_facecolor('#2f3136') - + fig.patch.set_facecolor("#2f3136") + # CPU subplot - ax1.set_facecolor('#36393f') - ax1.plot(timestamps, cpu_values, color='#7289da', linewidth=2.5, marker='o', markersize=4) - ax1.fill_between(timestamps, cpu_values, alpha=0.3, color='#7289da') - ax1.set_ylabel('CPU Usage (%)', color='#ffffff', fontsize=10) + ax1.set_facecolor("#36393f") + ax1.plot( + timestamps, + cpu_values, + color="#7289da", + linewidth=2.5, + marker="o", + markersize=4, + ) + ax1.fill_between(timestamps, cpu_values, alpha=0.3, color="#7289da") + ax1.set_ylabel("CPU Usage (%)", color="#ffffff", fontsize=10) ax1.set_ylim(0, cpu_scale_limit) - ax1.tick_params(colors='#ffffff', labelsize=8) - ax1.grid(True, alpha=0.3, color='#ffffff') - + ax1.tick_params(colors="#ffffff", labelsize=8) + ax1.grid(True, alpha=0.3, color="#ffffff") + # Add horizontal grid lines at 100% increments for CPU subplot for i in range(100, cpu_scale_limit + 1, 100): - ax1.axhline(y=i, color='#ffffff', alpha=0.2, linestyle='--', linewidth=0.8) - + ax1.axhline(y=i, color="#ffffff", alpha=0.2, linestyle="--", linewidth=0.8) + # Title with vCPU info if applicable - title = f'{self.server_name} - Resource Usage' + title = f"{self.server_name} - Resource Usage" if cpu_scale_limit > 100: estimated_vcpus = cpu_scale_limit // 100 - title += f' (~{estimated_vcpus} vCPU cores)' - ax1.set_title(title, color='#ffffff', fontsize=12) - + title += f" (~{estimated_vcpus} vCPU cores)" + ax1.set_title(title, color="#ffffff", fontsize=12) + # Memory subplot - ax2.set_facecolor('#36393f') - ax2.plot(timestamps, memory_values, color='#43b581', linewidth=2.5, marker='o', markersize=4) - ax2.fill_between(timestamps, memory_values, alpha=0.3, color='#43b581') - ax2.set_ylabel('Memory (MB)', color='#ffffff', fontsize=10) + ax2.set_facecolor("#36393f") + ax2.plot( + timestamps, + memory_values, + color="#43b581", + linewidth=2.5, + marker="o", + markersize=4, + ) + ax2.fill_between(timestamps, memory_values, alpha=0.3, color="#43b581") + ax2.set_ylabel("Memory (MB)", color="#ffffff", fontsize=10) ax2.set_ylim(0, max(memory_values) * 1.1) - ax2.tick_params(colors='#ffffff', labelsize=8) - ax2.grid(True, alpha=0.3, color='#ffffff') - + ax2.tick_params(colors="#ffffff", labelsize=8) + ax2.grid(True, alpha=0.3, color="#ffffff") + # Format time axis (only on bottom subplot) - ax2.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S')) + ax2.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M:%S")) ax2.xaxis.set_major_locator(mdates.SecondLocator(interval=20)) - plt.setp(ax2.xaxis.get_majorticklabels(), rotation=45, ha='right', color='#ffffff', fontsize=8) - + plt.setp( + ax2.xaxis.get_majorticklabels(), + rotation=45, + ha="right", + color="#ffffff", + fontsize=8, + ) + # Style both subplots for ax in [ax1, ax2]: - ax.spines['bottom'].set_color('#ffffff') - ax.spines['left'].set_color('#ffffff') - ax.spines['top'].set_visible(False) - ax.spines['right'].set_visible(False) - + ax.spines["bottom"].set_color("#ffffff") + ax.spines["left"].set_color("#ffffff") + ax.spines["top"].set_visible(False) + ax.spines["right"].set_visible(False) + plt.tight_layout() - + # Save to BytesIO img_buffer = io.BytesIO() - plt.savefig(img_buffer, format='png', facecolor='#2f3136', edgecolor='none', - bbox_inches='tight', dpi=100) + plt.savefig( + img_buffer, + format="png", + facecolor="#2f3136", + edgecolor="none", + bbox_inches="tight", + dpi=100, + ) img_buffer.seek(0) - + plt.close(fig) - + logger.debug(f"Generated combined graph for {self.server_name} (CPU scale: 0-{cpu_scale_limit}%)") return img_buffer - + except Exception as e: logger.error(f"Failed to generate combined graph for {self.server_name}: {str(e)}") - plt.close('all') + plt.close("all") return None - + def get_data_summary(self) -> Dict[str, any]: """ Get summary statistics for the current data points. - + Returns: Dictionary containing data point count, latest values, and trends """ if not self.data_points: return { - 'point_count': 0, - 'has_data': False, - 'latest_cpu': 0, - 'latest_memory': 0 + "point_count": 0, + "has_data": False, + "latest_cpu": 0, + "latest_memory": 0, } - + # Get latest values latest_point = self.data_points[-1] latest_cpu = latest_point[1] latest_memory = latest_point[2] - + # Calculate CPU scale info max_cpu = max(point[1] for point in self.data_points) cpu_scale_limit = self._calculate_cpu_scale_limit(max_cpu) estimated_vcpus = cpu_scale_limit // 100 - + # Calculate trends if we have multiple points - cpu_trend = 'stable' - memory_trend = 'stable' - + cpu_trend = "stable" + memory_trend = "stable" + if len(self.data_points) >= 2: first_point = self.data_points[0] cpu_change = latest_cpu - first_point[1] memory_change = latest_memory - first_point[2] - + # Determine trends (>5% change considered significant) if abs(cpu_change) > 5: - cpu_trend = 'increasing' if cpu_change > 0 else 'decreasing' - + cpu_trend = "increasing" if cpu_change > 0 else "decreasing" + if abs(memory_change) > 50: # 50MB change threshold - memory_trend = 'increasing' if memory_change > 0 else 'decreasing' - + memory_trend = "increasing" if memory_change > 0 else "decreasing" + return { - 'point_count': len(self.data_points), - 'has_data': self.has_sufficient_data, - 'latest_cpu': latest_cpu, - 'latest_memory': latest_memory, - 'cpu_trend': cpu_trend, - 'memory_trend': memory_trend, - 'cpu_scale_limit': cpu_scale_limit, - 'estimated_vcpus': estimated_vcpus, - 'time_span_minutes': len(self.data_points) * 10 / 60 # Convert to minutes + "point_count": len(self.data_points), + "has_data": self.has_sufficient_data, + "latest_cpu": latest_cpu, + "latest_memory": latest_memory, + "cpu_trend": cpu_trend, + "memory_trend": memory_trend, + "cpu_scale_limit": cpu_scale_limit, + "estimated_vcpus": estimated_vcpus, + "time_span_minutes": len(self.data_points) * 10 / 60, # Convert to minutes } class ServerMetricsManager: """ Global manager for all server metrics graphs. - + Handles: - Creation and cleanup of ServerMetricsGraphs instances - Bulk operations across all tracked servers - Memory management for graph storage """ - + def __init__(self): """Initialize the metrics manager.""" self.server_graphs: Dict[str, ServerMetricsGraphs] = {} logger.info("Initialized ServerMetricsManager") - + def get_or_create_server_graphs(self, server_id: str, server_name: str) -> ServerMetricsGraphs: """ Get existing ServerMetricsGraphs instance or create a new one. - + Args: server_id: Pterodactyl server identifier server_name: Human-readable server name - + Returns: ServerMetricsGraphs instance for the specified server """ if server_id not in self.server_graphs: self.server_graphs[server_id] = ServerMetricsGraphs(server_id, server_name) logger.debug(f"Created new metrics graphs for server {server_name}") - + return self.server_graphs[server_id] - + def add_server_data(self, server_id: str, server_name: str, cpu_percent: float, memory_mb: float): """ Add data point to a server's metrics tracking. - + Args: server_id: Pterodactyl server identifier server_name: Human-readable server name @@ -416,34 +472,34 @@ class ServerMetricsManager: """ graphs = self.get_or_create_server_graphs(server_id, server_name) graphs.add_data_point(cpu_percent, memory_mb) - + def remove_server(self, server_id: str): """ Remove a server from metrics tracking. - + Args: server_id: Pterodactyl server identifier to remove """ if server_id in self.server_graphs: del self.server_graphs[server_id] logger.debug(f"Removed metrics tracking for server {server_id}") - + def get_server_graphs(self, server_id: str) -> Optional[ServerMetricsGraphs]: """ Get ServerMetricsGraphs instance for a specific server. - + Args: server_id: Pterodactyl server identifier - + Returns: ServerMetricsGraphs instance or None if not found """ return self.server_graphs.get(server_id) - + def cleanup_old_servers(self, active_server_ids: list): """ Remove tracking for servers that no longer exist. - + Args: active_server_ids: List of currently active server IDs """ @@ -451,22 +507,22 @@ class ServerMetricsManager: for server_id in self.server_graphs: if server_id not in active_server_ids: servers_to_remove.append(server_id) - + for server_id in servers_to_remove: self.remove_server(server_id) - + if servers_to_remove: logger.info(f"Cleaned up metrics for {len(servers_to_remove)} inactive servers") - + def get_summary(self) -> Dict[str, any]: """ Get summary of all tracked servers. - + Returns: Dictionary with tracking statistics """ return { - 'total_servers': len(self.server_graphs), - 'servers_with_data': sum(1 for graphs in self.server_graphs.values() if graphs.has_sufficient_data), - 'total_data_points': sum(len(graphs.data_points) for graphs in self.server_graphs.values()) - } \ No newline at end of file + "total_servers": len(self.server_graphs), + "servers_with_data": sum(1 for graphs in self.server_graphs.values() if graphs.has_sufficient_data), + "total_data_points": sum(len(graphs.data_points) for graphs in self.server_graphs.values()), + } diff --git a/test_pterodisbot.py b/test_pterodisbot.py new file mode 100644 index 0000000..36de647 --- /dev/null +++ b/test_pterodisbot.py @@ -0,0 +1,797 @@ +""" +Unit and Integration Tests for Pterodactyl Discord Bot + +Test coverage: +- Configuration validation +- Pterodactyl API client operations +- Discord bot commands and interactions +- Server metrics tracking +- Embed management +- Error handling +""" + +import pytest +import asyncio +import json +import os +from unittest.mock import Mock, AsyncMock, patch, MagicMock +from datetime import datetime +import configparser +import discord +from discord.ext import commands +import aiohttp + +# Import the modules to test +import sys +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from pterodisbot import ( + PterodactylAPI, + ServerStatusView, + PterodactylBot, + ConfigValidationError, + validate_config, + REQUIRED_ROLE +) +from server_metrics_graphs import ServerMetricsGraphs, ServerMetricsManager + + +# ========================================== +# FIXTURES +# ========================================== + +@pytest.fixture +def mock_config(): + """ + Create a mock configuration for testing. + + Returns: + ConfigParser: A properly configured test configuration object + """ + config = configparser.ConfigParser() + config['Pterodactyl'] = { + 'PanelURL': 'https://panel.example.com', + 'ClientAPIKey': 'ptlc_test_client_key_123', + 'ApplicationAPIKey': 'ptla_test_app_key_456' + } + config['Discord'] = { + 'Token': 'test_discord_token', + 'AllowedGuildID': '123456789' + } + return config + + +@pytest.fixture +def mock_pterodactyl_api(): + """ + Create a mock PterodactylAPI instance with properly configured session. + + Returns: + PterodactylAPI: A mocked API instance ready for testing + """ + api = PterodactylAPI( + 'https://panel.example.com', + 'ptlc_test_client_key', + 'ptla_test_app_key' + ) + # Create a proper async mock session + api.session = AsyncMock(spec=aiohttp.ClientSession) + api.session.close = AsyncMock() # Ensure close is an async mock + return api + + +@pytest.fixture +def sample_server_data(): + """ + Sample server data from Pterodactyl API. + + Returns: + dict: Server attributes in Pterodactyl API format + """ + return { + 'attributes': { + 'identifier': 'abc123', + 'name': 'Test Server', + 'description': 'A test game server', + 'suspended': False, + 'limits': { + 'cpu': 200, + 'memory': 2048, + 'disk': 10240 + } + } + } + + +@pytest.fixture +def sample_resources_data(): + """ + Sample resource usage data from Pterodactyl API. + + Returns: + dict: Resource usage attributes in Pterodactyl API format + """ + return { + 'attributes': { + 'current_state': 'running', + 'resources': { + 'cpu_absolute': 45.5, + 'memory_bytes': 1073741824, # 1GB + 'disk_bytes': 5368709120, # 5GB + 'network_rx_bytes': 10485760, # 10MB + 'network_tx_bytes': 5242880, # 5MB + 'uptime': 3600000 # 1 hour in milliseconds + } + } + } + + +@pytest.fixture +def mock_discord_interaction(): + """ + Create a mock Discord interaction with properly configured user roles. + + Returns: + AsyncMock: A mocked Discord interaction object + """ + interaction = AsyncMock(spec=discord.Interaction) + interaction.user = Mock() + interaction.user.name = 'TestUser' + + # Create mock role with proper name attribute + mock_role = Mock() + mock_role.name = REQUIRED_ROLE + interaction.user.roles = [mock_role] + + interaction.guild_id = 123456789 + interaction.channel = Mock() + interaction.channel.id = 987654321 + interaction.response = AsyncMock() + interaction.followup = AsyncMock() + return interaction + + +# ========================================== +# CONFIGURATION VALIDATION TESTS +# ========================================== + +class TestConfigValidation: + """Test configuration validation logic.""" + + def test_valid_config(self, mock_config, monkeypatch): + """ + Test that valid configuration passes validation. + + Args: + mock_config: Pytest fixture providing valid config + monkeypatch: Pytest monkeypatch fixture for patching + """ + monkeypatch.setattr('pterodisbot.config', mock_config) + + # Should not raise any exceptions + try: + validate_config() + except ConfigValidationError: + pytest.fail("Valid configuration should not raise ConfigValidationError") + + def test_missing_pterodactyl_section(self, monkeypatch): + """ + Test validation fails with missing Pterodactyl section. + + Args: + monkeypatch: Pytest monkeypatch fixture for patching + """ + config = configparser.ConfigParser() + config['Discord'] = { + 'Token': 'test_token', + 'AllowedGuildID': '123456789' + } + monkeypatch.setattr('pterodisbot.config', config) + + with pytest.raises(ConfigValidationError, match="Missing \\[Pterodactyl\\] section"): + validate_config() + + def test_invalid_api_key_prefix(self, mock_config, monkeypatch): + """ + Test validation fails with incorrect API key prefix. + + Args: + mock_config: Pytest fixture providing config + monkeypatch: Pytest monkeypatch fixture for patching + """ + mock_config['Pterodactyl']['ClientAPIKey'] = 'invalid_prefix_key' + monkeypatch.setattr('pterodisbot.config', mock_config) + + with pytest.raises(ConfigValidationError, match="ClientAPIKey should start with 'ptlc_'"): + validate_config() + + def test_invalid_guild_id(self, mock_config, monkeypatch): + """ + Test validation fails with invalid guild ID. + + Args: + mock_config: Pytest fixture providing config + monkeypatch: Pytest monkeypatch fixture for patching + """ + mock_config['Discord']['AllowedGuildID'] = 'not_a_number' + monkeypatch.setattr('pterodisbot.config', mock_config) + + with pytest.raises(ConfigValidationError, match="AllowedGuildID must be a valid integer"): + validate_config() + + def test_invalid_panel_url(self, mock_config, monkeypatch): + """ + Test validation fails with invalid panel URL. + + Args: + mock_config: Pytest fixture providing config + monkeypatch: Pytest monkeypatch fixture for patching + """ + mock_config['Pterodactyl']['PanelURL'] = 'not-a-url' + monkeypatch.setattr('pterodisbot.config', mock_config) + + with pytest.raises(ConfigValidationError, match="PanelURL must start with http"): + validate_config() + + +# ========================================== +# PTERODACTYL API TESTS +# ========================================== + +class TestPterodactylAPI: + """Test Pterodactyl API client functionality.""" + + @pytest.mark.asyncio + async def test_initialize(self): + """ + Test API client initialization. + + Verifies that the API client properly creates an aiohttp session + """ + api = PterodactylAPI('https://panel.example.com', 'ptlc_key', 'ptla_key') + await api.initialize() + + assert api.session is not None + assert isinstance(api.session, aiohttp.ClientSession) + + await api.close() + + @pytest.mark.asyncio + async def test_close(self, mock_pterodactyl_api): + """ + Test API client cleanup properly calls session.close(). + + Args: + mock_pterodactyl_api: Pytest fixture providing mocked API instance + """ + # Ensure the session is marked as not closed + mock_pterodactyl_api.session.closed = False + + await mock_pterodactyl_api.close() + + # Verify close was called once + mock_pterodactyl_api.session.close.assert_called_once() + + @pytest.mark.asyncio + async def test_request_success(self, mock_pterodactyl_api): + """ + Test successful API request with properly mocked context manager. + + Args: + mock_pterodactyl_api: Pytest fixture providing mocked API instance + """ + # Create a mock response + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.json = AsyncMock(return_value={'data': 'test'}) + + # Create a mock context manager that returns the response + mock_context = AsyncMock() + mock_context.__aenter__.return_value = mock_response + mock_context.__aexit__.return_value = AsyncMock() + + # Configure the session.request to return the context manager + mock_pterodactyl_api.session.request = Mock(return_value=mock_context) + + result = await mock_pterodactyl_api._request('GET', 'test/endpoint') + + assert result == {'data': 'test'} + mock_pterodactyl_api.session.request.assert_called_once() + + @pytest.mark.asyncio + async def test_request_error(self, mock_pterodactyl_api): + """ + Test API request error handling with properly mocked context manager. + + Args: + mock_pterodactyl_api: Pytest fixture providing mocked API instance + """ + # Create a mock error response + mock_response = AsyncMock() + mock_response.status = 404 + mock_response.json = AsyncMock(return_value={ + 'errors': [{'detail': 'Server not found'}] + }) + + # Create a mock context manager that returns the error response + mock_context = AsyncMock() + mock_context.__aenter__.return_value = mock_response + mock_context.__aexit__.return_value = AsyncMock() + + # Configure the session.request to return the context manager + mock_pterodactyl_api.session.request = Mock(return_value=mock_context) + + result = await mock_pterodactyl_api._request('GET', 'test/endpoint') + + assert result['status'] == 'error' + assert 'Server not found' in result['message'] + + @pytest.mark.asyncio + async def test_get_servers(self, mock_pterodactyl_api, sample_server_data): + """ + Test retrieving server list from API. + + Args: + mock_pterodactyl_api: Pytest fixture providing mocked API instance + sample_server_data: Pytest fixture providing sample server data + """ + mock_pterodactyl_api._request = AsyncMock(return_value={ + 'data': [sample_server_data] + }) + + servers = await mock_pterodactyl_api.get_servers() + + assert len(servers) == 1 + assert servers[0] == sample_server_data + mock_pterodactyl_api._request.assert_called_once_with( + 'GET', 'application/servers', use_application_key=True + ) + + @pytest.mark.asyncio + async def test_get_server_resources(self, mock_pterodactyl_api, sample_resources_data): + """ + Test retrieving server resource usage from API. + + Args: + mock_pterodactyl_api: Pytest fixture providing mocked API instance + sample_resources_data: Pytest fixture providing sample resource data + """ + mock_pterodactyl_api._request = AsyncMock(return_value=sample_resources_data) + + resources = await mock_pterodactyl_api.get_server_resources('abc123') + + assert resources['attributes']['current_state'] == 'running' + mock_pterodactyl_api._request.assert_called_once_with( + 'GET', 'client/servers/abc123/resources' + ) + + @pytest.mark.asyncio + async def test_send_power_action_valid(self, mock_pterodactyl_api): + """ + Test sending valid power action to server. + + Args: + mock_pterodactyl_api: Pytest fixture providing mocked API instance + """ + mock_pterodactyl_api._request = AsyncMock(return_value={'status': 'success'}) + + result = await mock_pterodactyl_api.send_power_action('abc123', 'start') + + assert result['status'] == 'success' + mock_pterodactyl_api._request.assert_called_once_with( + 'POST', 'client/servers/abc123/power', {'signal': 'start'} + ) + + @pytest.mark.asyncio + async def test_send_power_action_invalid(self, mock_pterodactyl_api): + """ + Test sending invalid power action returns error. + + Args: + mock_pterodactyl_api: Pytest fixture providing mocked API instance + """ + result = await mock_pterodactyl_api.send_power_action('abc123', 'invalid_action') + + assert result['status'] == 'error' + assert 'Invalid action' in result['message'] + + +# ========================================== +# SERVER METRICS TESTS +# ========================================== + +class TestServerMetricsGraphs: + """Test server metrics tracking and graphing.""" + + def test_initialization(self): + """ + Test metrics graph initialization with empty state. + """ + graphs = ServerMetricsGraphs('abc123', 'Test Server') + + assert graphs.server_id == 'abc123' + assert graphs.server_name == 'Test Server' + assert len(graphs.data_points) == 0 + assert graphs.has_sufficient_data is False + + def test_add_data_point(self): + """ + Test adding data points and checking sufficient data threshold. + """ + graphs = ServerMetricsGraphs('abc123', 'Test Server') + + graphs.add_data_point(50.0, 1024.0) + + assert len(graphs.data_points) == 1 + assert graphs.has_sufficient_data is False + + graphs.add_data_point(55.0, 1100.0) + + assert len(graphs.data_points) == 2 + assert graphs.has_sufficient_data is True + + def test_data_rotation(self): + """ + Test automatic data point rotation (FIFO with maxlen=6). + """ + graphs = ServerMetricsGraphs('abc123', 'Test Server') + + # Add 8 data points to test rotation + for i in range(8): + graphs.add_data_point(float(i * 10), float(i * 100)) + + # Should only keep the last 6 + assert len(graphs.data_points) == 6 + assert graphs.data_points[0][1] == 20.0 # CPU of 3rd point + assert graphs.data_points[-1][1] == 70.0 # CPU of 8th point + + def test_cpu_scale_calculation(self): + """ + Test dynamic CPU scale limit calculation for multi-vCPU servers. + """ + graphs = ServerMetricsGraphs('abc123', 'Test Server') + + # Test single vCPU (<=100%) + assert graphs._calculate_cpu_scale_limit(75.0) == 100 + assert graphs._calculate_cpu_scale_limit(100.0) == 100 + + # Test multi-vCPU scenarios + assert graphs._calculate_cpu_scale_limit(150.0) == 200 + assert graphs._calculate_cpu_scale_limit(250.0) == 300 + assert graphs._calculate_cpu_scale_limit(350.0) == 400 + + def test_get_data_summary(self): + """ + Test data summary generation including trends. + """ + graphs = ServerMetricsGraphs('abc123', 'Test Server') + + # No data case + summary = graphs.get_data_summary() + assert summary['point_count'] == 0 + assert summary['has_data'] is False + + # Add data points with increasing trend + graphs.add_data_point(50.0, 1000.0) + graphs.add_data_point(60.0, 1100.0) + + summary = graphs.get_data_summary() + assert summary['point_count'] == 2 + assert summary['has_data'] is True + assert summary['latest_cpu'] == 60.0 + assert summary['latest_memory'] == 1100.0 + assert summary['cpu_trend'] == 'increasing' + + def test_generate_graph_insufficient_data(self): + """ + Test graph generation returns None with insufficient data. + """ + graphs = ServerMetricsGraphs('abc123', 'Test Server') + + # Only one data point - should return None + graphs.add_data_point(50.0, 1000.0) + + assert graphs.generate_cpu_graph() is None + assert graphs.generate_memory_graph() is None + assert graphs.generate_combined_graph() is None + + +class TestServerMetricsManager: + """Test server metrics manager.""" + + def test_initialization(self): + """ + Test manager initialization with empty state. + """ + manager = ServerMetricsManager() + assert len(manager.server_graphs) == 0 + + def test_get_or_create_server_graphs(self): + """ + Test getting or creating server graphs returns same instance. + """ + manager = ServerMetricsManager() + + graphs1 = manager.get_or_create_server_graphs('abc123', 'Test Server') + graphs2 = manager.get_or_create_server_graphs('abc123', 'Test Server') + + assert graphs1 is graphs2 # Should return same instance + assert len(manager.server_graphs) == 1 + + def test_add_server_data(self): + """ + Test adding data through manager properly creates graphs. + """ + manager = ServerMetricsManager() + + manager.add_server_data('abc123', 'Test Server', 50.0, 1024.0) + + graphs = manager.get_server_graphs('abc123') + assert graphs is not None + assert len(graphs.data_points) == 1 + + def test_remove_server(self): + """ + Test removing server from tracking. + """ + manager = ServerMetricsManager() + + manager.add_server_data('abc123', 'Test Server', 50.0, 1024.0) + assert 'abc123' in manager.server_graphs + + manager.remove_server('abc123') + assert 'abc123' not in manager.server_graphs + + def test_cleanup_old_servers(self): + """ + Test cleanup of inactive servers not in active list. + """ + manager = ServerMetricsManager() + + # Add data for 3 servers + manager.add_server_data('server1', 'Server 1', 50.0, 1024.0) + manager.add_server_data('server2', 'Server 2', 60.0, 2048.0) + manager.add_server_data('server3', 'Server 3', 70.0, 3072.0) + + # Only server1 and server2 are still active + manager.cleanup_old_servers(['server1', 'server2']) + + assert 'server1' in manager.server_graphs + assert 'server2' in manager.server_graphs + assert 'server3' not in manager.server_graphs + + def test_get_summary(self): + """ + Test getting manager summary with statistics. + """ + manager = ServerMetricsManager() + + # Add some servers with varying data + manager.add_server_data('server1', 'Server 1', 50.0, 1024.0) + manager.add_server_data('server1', 'Server 1', 55.0, 1100.0) + manager.add_server_data('server2', 'Server 2', 60.0, 2048.0) + + summary = manager.get_summary() + assert summary['total_servers'] == 2 + assert summary['servers_with_data'] == 1 # Only server1 has >=2 points + assert summary['total_data_points'] == 3 + + +# ========================================== +# DISCORD BOT TESTS +# ========================================== + +class TestServerStatusView: + """Test Discord UI view for server status.""" + + @pytest.mark.asyncio + async def test_view_initialization(self, mock_pterodactyl_api, sample_server_data): + """ + Test view initialization with server data. + + Args: + mock_pterodactyl_api: Pytest fixture providing mocked API instance + sample_server_data: Pytest fixture providing sample server data + """ + view = ServerStatusView( + 'abc123', + 'Test Server', + mock_pterodactyl_api, + sample_server_data + ) + + assert view.server_id == 'abc123' + assert view.server_name == 'Test Server' + assert view.api is mock_pterodactyl_api + + @pytest.mark.asyncio + async def test_interaction_check_authorized(self, mock_pterodactyl_api, + sample_server_data, mock_discord_interaction): + """ + Test interaction check with authorized user having required role. + + Args: + mock_pterodactyl_api: Pytest fixture providing mocked API instance + sample_server_data: Pytest fixture providing sample server data + mock_discord_interaction: Pytest fixture providing mocked Discord interaction + """ + view = ServerStatusView('abc123', 'Test Server', + mock_pterodactyl_api, sample_server_data) + + result = await view.interaction_check(mock_discord_interaction) + + assert result is True + + @pytest.mark.asyncio + async def test_interaction_check_wrong_guild(self, mock_pterodactyl_api, + sample_server_data, mock_discord_interaction): + """ + Test interaction check rejects wrong guild. + + Args: + mock_pterodactyl_api: Pytest fixture providing mocked API instance + sample_server_data: Pytest fixture providing sample server data + mock_discord_interaction: Pytest fixture providing mocked Discord interaction + """ + view = ServerStatusView('abc123', 'Test Server', + mock_pterodactyl_api, sample_server_data) + + mock_discord_interaction.guild_id = 999999999 # Wrong guild + + result = await view.interaction_check(mock_discord_interaction) + + assert result is False + mock_discord_interaction.response.send_message.assert_called_once() + + +class TestPterodactylBot: + """Test main bot class.""" + + @pytest.mark.asyncio + async def test_bot_initialization(self): + """ + Test bot initialization with default values. + """ + intents = discord.Intents.default() + bot = PterodactylBot(command_prefix="!", intents=intents) + + assert bot.server_cache == {} + assert bot.embed_locations == {} + assert bot.metrics_manager is not None + + @pytest.mark.asyncio + async def test_track_new_embed(self): + """ + Test tracking new embed location in storage. + """ + intents = discord.Intents.default() + bot = PterodactylBot(command_prefix="!", intents=intents) + + mock_message = Mock() + mock_message.channel = Mock() + mock_message.channel.id = 123456 + mock_message.id = 789012 + + with patch.object(bot, 'save_embed_locations', new=AsyncMock()): + await bot.track_new_embed('abc123', mock_message) + + assert 'abc123' in bot.embed_locations + assert bot.embed_locations['abc123']['channel_id'] == '123456' + assert bot.embed_locations['abc123']['message_id'] == '789012' + + @pytest.mark.asyncio + async def test_load_embed_locations(self, tmp_path): + """ + Test loading embed locations from JSON file. + + Args: + tmp_path: Pytest fixture providing temporary directory + """ + intents = discord.Intents.default() + bot = PterodactylBot(command_prefix="!", intents=intents) + + # Create temporary embed locations file + embed_file = tmp_path / "embed_locations.json" + test_data = { + 'abc123': { + 'channel_id': '123456', + 'message_id': '789012' + } + } + embed_file.write_text(json.dumps(test_data)) + + bot.embed_storage_path = embed_file + await bot.load_embed_locations() + + assert 'abc123' in bot.embed_locations + assert bot.embed_locations['abc123']['channel_id'] == '123456' + + @pytest.mark.asyncio + async def test_save_embed_locations(self, tmp_path): + """ + Test saving embed locations to JSON file. + + Args: + tmp_path: Pytest fixture providing temporary directory + """ + intents = discord.Intents.default() + bot = PterodactylBot(command_prefix="!", intents=intents) + + embed_file = tmp_path / "embed_locations.json" + bot.embed_storage_path = embed_file + + bot.embed_locations = { + 'abc123': { + 'channel_id': '123456', + 'message_id': '789012' + } + } + + await bot.save_embed_locations() + + assert embed_file.exists() + loaded_data = json.loads(embed_file.read_text()) + assert loaded_data == bot.embed_locations + + +# ========================================== +# INTEGRATION TESTS +# ========================================== + +class TestIntegration: + """Integration tests for complete workflows.""" + + @pytest.mark.asyncio + async def test_server_status_command_flow(self, mock_discord_interaction, + sample_server_data, sample_resources_data): + """ + Test complete server status command flow. + + Args: + mock_discord_interaction: Pytest fixture providing mocked Discord interaction + sample_server_data: Pytest fixture providing sample server data + sample_resources_data: Pytest fixture providing sample resource data + """ + # This would require extensive mocking of Discord.py internals + # Simplified test to verify command registration + + intents = discord.Intents.default() + bot = PterodactylBot(command_prefix="!", intents=intents) + + # Verify command exists in tree + assert bot.tree is not None + + @pytest.mark.asyncio + async def test_metrics_collection_and_graphing(self): + """ + Test complete metrics collection and graph generation flow. + """ + manager = ServerMetricsManager() + + # Simulate data collection over time + for i in range(6): + cpu = 50.0 + (i * 5) + memory = 1000.0 + (i * 100) + manager.add_server_data('test_server', 'Test Server', cpu, memory) + + graphs = manager.get_server_graphs('test_server') + assert graphs is not None + assert graphs.has_sufficient_data + + # Generate graphs + cpu_graph = graphs.generate_cpu_graph() + memory_graph = graphs.generate_memory_graph() + combined_graph = graphs.generate_combined_graph() + + # Verify graphs were generated + assert cpu_graph is not None + assert memory_graph is not None + assert combined_graph is not None + + +# ========================================== +# RUN TESTS +# ========================================== + +if __name__ == '__main__': + pytest.main([__file__, '-v', '--tb=short']) \ No newline at end of file