12 Commits

Author SHA1 Message Date
11ee1447de Fix: Unit test async mock
All checks were successful
CI/CD Pipeline / Unit Tests (Python 3.10) (push) Successful in 9m20s
CI/CD Pipeline / Unit Tests (Python 3.11) (push) Successful in 9m17s
CI/CD Pipeline / Unit Tests (Python 3.9) (push) Successful in 9m19s
CI/CD Pipeline / Code Quality & Linting (push) Successful in 43s
CI/CD Pipeline / Security Scanning (push) Successful in 15s
CI/CD Pipeline / Integration Tests (push) Successful in 9m13s
CI/CD Pipeline / Build Docker Image (push) Successful in 10m37s
CI/CD Pipeline / Generate Test Report (push) Successful in 3s
CI/CD Pipeline / CI/CD Pipeline Status (push) Successful in 1s
2025-10-24 14:22:00 +00:00
08dee3db99 Add: CI/CD testing automation
Some checks failed
CI/CD Pipeline / Unit Tests (Python 3.10) (push) Failing after 6m34s
CI/CD Pipeline / Unit Tests (Python 3.11) (push) Failing after 5m31s
CI/CD Pipeline / Unit Tests (Python 3.9) (push) Failing after 5m44s
CI/CD Pipeline / Code Quality & Linting (push) Successful in 48s
CI/CD Pipeline / Security Scanning (push) Successful in 17s
CI/CD Pipeline / Integration Tests (push) Has been skipped
CI/CD Pipeline / Build Docker Image (push) Has been skipped
CI/CD Pipeline / Generate Test Report (push) Successful in 13s
CI/CD Pipeline / CI/CD Pipeline Status (push) Successful in 1s
2025-10-23 13:20:39 +00:00
55971496c8 Fix: Matplotlib permission error
All checks were successful
Docker Build and Push (Multi-architecture) / build-and-push (push) Successful in 34s
2025-09-29 05:07:24 +00:00
135d596119 Fix: Dependency update for Docker image
All checks were successful
Docker Build and Push (Multi-architecture) / build-and-push (push) Successful in 1m53s
2025-09-29 04:52:17 +00:00
205c8eb9b7 Fix: Dependency update for Docker image
All checks were successful
Docker Build and Push (Multi-architecture) / build-and-push (push) Successful in 2m51s
2025-09-29 04:40:19 +00:00
ce77639a47 Add: Dynamic graph scaling for multi vCPU
All checks were successful
Docker Build and Push (Multi-architecture) / build-and-push (push) Successful in 33s
2025-09-29 04:07:33 +00:00
ce4887bae3 Fix: Delete graph image if server not running
All checks were successful
Docker Build and Push (Multi-architecture) / build-and-push (push) Successful in 36s
2025-09-28 18:41:50 +00:00
ca9e88f1e2 Improve formatting in embeds
All checks were successful
Docker Build and Push (Multi-architecture) / build-and-push (push) Successful in 34s
2025-09-28 17:10:03 +00:00
4b400fea1f Add server metrics graphing feature
All checks were successful
Docker Build and Push (Multi-architecture) / build-and-push (push) Successful in 5m25s
2025-09-28 16:11:43 +00:00
cbb951d121 Update embed text formatting
All checks were successful
Docker Build and Push (Multi-architecture) / build-and-push (push) Successful in 28s
2025-09-26 12:38:47 +00:00
174c27c933 Redesign server embed
All checks were successful
Docker Build and Push (Multi-architecture) / build-and-push (push) Successful in 28s
2025-09-26 10:43:03 +00:00
1f7f211e36 Add server uptime metric to embed
All checks were successful
Docker Build and Push (Multi-architecture) / build-and-push (push) Successful in 23s
2025-09-26 08:52:41 +00:00
10 changed files with 3648 additions and 790 deletions

400
.gitea/workflows/ci-cd.yml Normal file
View File

@@ -0,0 +1,400 @@
name: CI/CD Pipeline
on:
push:
branches: [ main, experimental, dev ]
tags: [ 'v*.*.*' ]
pull_request:
branches: [ main ]
workflow_dispatch:
inputs:
skip_tests:
description: 'Skip tests'
required: false
default: 'false'
type: boolean
image_tag:
description: 'Custom tag for Docker image'
required: false
default: 'latest'
type: string
jobs:
# ==========================================
# TESTING STAGE
# ==========================================
unit-tests:
name: Unit Tests (Python ${{ matrix.python-version }})
runs-on: ubuntu-latest
if: ${{ !inputs.skip_tests }}
strategy:
fail-fast: false
matrix:
python-version: ['3.9', '3.10', '3.11']
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Cache pip dependencies
uses: actions/cache@v3
with:
path: ~/.cache/pip
key: ${{ runner.os }}-py${{ matrix.python-version }}-pip-${{ hashFiles('requirements.txt', 'requirements-test.txt') }}
restore-keys: |
${{ runner.os }}-py${{ matrix.python-version }}-pip-
- name: Install dependencies
run: |
python -m pip install --upgrade pip setuptools wheel
pip install -r requirements.txt
pip install -r requirements-test.txt
- name: Create test configuration
run: |
mkdir -p embed logs
cat > config.ini << EOF
[Pterodactyl]
PanelURL = https://panel.example.com
ClientAPIKey = ptlc_test_client_key_123456789
ApplicationAPIKey = ptla_test_app_key_987654321
[Discord]
Token = test_discord_token_placeholder
AllowedGuildID = 123456789
EOF
- name: Run unit tests with coverage
run: |
pytest test_pterodisbot.py \
-v \
--tb=short \
--cov=pterodisbot \
--cov=server_metrics_graphs \
--cov-report=xml \
--cov-report=term \
--cov-report=html \
--junitxml=test-results-${{ matrix.python-version }}.xml
- name: Upload coverage to artifacts
uses: actions/upload-artifact@v3
with:
name: coverage-report-py${{ matrix.python-version }}
path: |
coverage.xml
htmlcov/
test-results-${{ matrix.python-version }}.xml
code-quality:
name: Code Quality & Linting
runs-on: ubuntu-latest
if: ${{ !inputs.skip_tests }}
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install linting tools
run: |
python -m pip install --upgrade pip
pip install flake8 pylint black isort mypy
- name: Run flake8
run: |
flake8 pterodisbot.py server_metrics_graphs.py \
--max-line-length=120 \
--ignore=E501,W503,E203 \
--exclude=venv,__pycache__,build,dist \
--statistics \
--output-file=flake8-report.txt
continue-on-error: true
- name: Run pylint
run: |
pylint pterodisbot.py server_metrics_graphs.py \
--disable=C0111,C0103,R0913,R0914,R0915,W0718 \
--max-line-length=120 \
--output-format=text \
--reports=y > pylint-report.txt || true
continue-on-error: true
- name: Check code formatting with black
run: |
black --check --line-length=120 --diff pterodisbot.py server_metrics_graphs.py | tee black-report.txt
continue-on-error: true
- name: Check import ordering
run: |
isort --check-only --profile black --line-length=120 pterodisbot.py server_metrics_graphs.py
continue-on-error: true
- name: Type checking with mypy
run: |
mypy pterodisbot.py server_metrics_graphs.py --ignore-missing-imports > mypy-report.txt || true
continue-on-error: true
- name: Upload linting reports
uses: actions/upload-artifact@v3
with:
name: code-quality-reports
path: |
flake8-report.txt
pylint-report.txt
black-report.txt
mypy-report.txt
security-scan:
name: Security Scanning
runs-on: ubuntu-latest
if: ${{ !inputs.skip_tests }}
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install security tools
run: |
python -m pip install --upgrade pip
pip install bandit safety pip-audit
- name: Run bandit security scan
run: |
bandit -r . \
-f json \
-o bandit-report.json \
-ll \
--exclude ./venv,./test_*.py,./tests
continue-on-error: true
- name: Run safety dependency check
run: |
pip install -r requirements.txt
safety check --json --output safety-report.json || true
continue-on-error: true
- name: Run pip-audit
run: |
pip-audit --desc --format json --output pip-audit-report.json || true
continue-on-error: true
- name: Upload security reports
uses: actions/upload-artifact@v3
with:
name: security-reports
path: |
bandit-report.json
safety-report.json
pip-audit-report.json
integration-tests:
name: Integration Tests
runs-on: ubuntu-latest
needs: [unit-tests]
if: ${{ !inputs.skip_tests }}
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Cache dependencies
uses: actions/cache@v3
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-integration-${{ hashFiles('requirements.txt') }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -r requirements-test.txt
- name: Create test configuration
run: |
mkdir -p embed logs
cat > config.ini << EOF
[Pterodactyl]
PanelURL = https://panel.example.com
ClientAPIKey = ptlc_test_client_key_123456789
ApplicationAPIKey = ptla_test_app_key_987654321
[Discord]
Token = test_discord_token_placeholder
AllowedGuildID = 123456789
EOF
- name: Run integration tests
run: |
pytest test_pterodisbot.py::TestIntegration \
-v \
--tb=short \
--timeout=60
# ==========================================
# BUILD STAGE
# ==========================================
docker-build:
name: Build Docker Image
runs-on: ubuntu-latest
needs: [unit-tests, code-quality, security-scan]
if: |
always() &&
(needs.unit-tests.result == 'success' || inputs.skip_tests) &&
(github.event_name == 'push' || github.event_name == 'workflow_dispatch')
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
with:
platforms: arm64
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
with:
platforms: linux/amd64,linux/arm64
driver-opts: |
image=moby/buildkit:latest
- name: Log in to registry
uses: docker/login-action@v2
with:
registry: ${{ vars.REGISTRY }}
username: ${{ secrets.REGISTRY_USERNAME }}
password: ${{ secrets.REGISTRY_PASSWORD }}
- name: Generate Docker image tags
id: tags
run: |
IMAGE_NAME="${{ vars.REGISTRY }}/${{ github.repository_owner }}/${{ vars.IMAGE_NAME }}"
if [ -n "${{ github.event.inputs.image_tag }}" ]; then
PRIMARY_TAG="${{ github.event.inputs.image_tag }}"
elif [[ ${{ github.ref }} == refs/tags/v* ]]; then
PRIMARY_TAG="${GITHUB_REF#refs/tags/}"
elif [[ ${{ github.ref }} == refs/heads/main ]]; then
PRIMARY_TAG="latest"
elif [[ ${{ github.ref }} == refs/heads/experimental ]]; then
PRIMARY_TAG="experimental"
elif [[ ${{ github.ref }} == refs/heads/dev ]]; then
PRIMARY_TAG="dev"
else
PRIMARY_TAG="latest"
fi
TAGS="$IMAGE_NAME:$PRIMARY_TAG,$IMAGE_NAME:${{ github.sha }}"
if [[ ${{ github.ref }} == refs/tags/v* ]]; then
MAJOR_MINOR_TAG=$(echo "$PRIMARY_TAG" | sed -E 's/^v([0-9]+\.[0-9]+)\.[0-9]+.*$/v\1/')
if [[ "$MAJOR_MINOR_TAG" != "$PRIMARY_TAG" ]]; then
TAGS="$TAGS,$IMAGE_NAME:$MAJOR_MINOR_TAG"
fi
MAJOR_TAG=$(echo "$PRIMARY_TAG" | sed -E 's/^v([0-9]+)\.[0-9]+\.[0-9]+.*$/v\1/')
if [[ "$MAJOR_TAG" != "$PRIMARY_TAG" ]]; then
TAGS="$TAGS,$IMAGE_NAME:$MAJOR_TAG"
fi
fi
echo "tags=$TAGS" >> $GITHUB_OUTPUT
echo "Generated tags: $TAGS"
- name: Build and push multi-arch image
uses: docker/build-push-action@v4
with:
context: .
platforms: linux/amd64,linux/arm64
push: true
cache-from: type=registry,ref=${{ vars.REGISTRY }}/${{ github.repository_owner }}/${{ vars.IMAGE_NAME }}:cache
cache-to: type=registry,ref=${{ vars.REGISTRY }}/${{ github.repository_owner }}/${{ vars.IMAGE_NAME }}:cache,mode=max
tags: ${{ steps.tags.outputs.tags }}
labels: |
org.opencontainers.image.source=${{ github.server_url }}/${{ github.repository }}
org.opencontainers.image.revision=${{ github.sha }}
org.opencontainers.image.created=${{ github.event.head_commit.timestamp }}
# ==========================================
# REPORTING STAGE
# ==========================================
test-report:
name: Generate Test Report
runs-on: ubuntu-latest
needs: [unit-tests, code-quality, security-scan, integration-tests]
if: always() && !inputs.skip_tests
steps:
- name: Download all artifacts
uses: actions/download-artifact@v3
- name: Generate test summary
run: |
echo "## 🧪 Test Results Summary" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### Job Status:" >> $GITHUB_STEP_SUMMARY
echo "- ✅ Unit Tests: \`${{ needs.unit-tests.result }}\`" >> $GITHUB_STEP_SUMMARY
echo "- 🎨 Code Quality: \`${{ needs.code-quality.result }}\`" >> $GITHUB_STEP_SUMMARY
echo "- 🔒 Security Scan: \`${{ needs.security-scan.result }}\`" >> $GITHUB_STEP_SUMMARY
echo "- 🔗 Integration Tests: \`${{ needs.integration-tests.result }}\`" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### Artifacts Generated:" >> $GITHUB_STEP_SUMMARY
echo "- Coverage reports (HTML & XML)" >> $GITHUB_STEP_SUMMARY
echo "- Code quality reports (flake8, pylint, black)" >> $GITHUB_STEP_SUMMARY
echo "- Security scan reports (bandit, safety)" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "**Commit:** \`${{ github.sha }}\`" >> $GITHUB_STEP_SUMMARY
echo "**Branch:** \`${{ github.ref_name }}\`" >> $GITHUB_STEP_SUMMARY
echo "**Triggered by:** ${{ github.actor }}" >> $GITHUB_STEP_SUMMARY
final-status:
name: CI/CD Pipeline Status
runs-on: ubuntu-latest
needs: [test-report, docker-build]
if: always()
steps:
- name: Check pipeline status
run: |
echo "## 🚀 CI/CD Pipeline Complete" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
if [[ "${{ needs.docker-build.result }}" == "success" ]]; then
echo "✅ **Docker image built and pushed successfully**" >> $GITHUB_STEP_SUMMARY
elif [[ "${{ needs.docker-build.result }}" == "skipped" ]]; then
echo "⏭️ **Docker build skipped**" >> $GITHUB_STEP_SUMMARY
else
echo "❌ **Docker build failed**" >> $GITHUB_STEP_SUMMARY
fi
echo "" >> $GITHUB_STEP_SUMMARY
echo "**Pipeline run:** ${{ github.run_number }}" >> $GITHUB_STEP_SUMMARY
echo "**Workflow:** ${{ github.workflow }}" >> $GITHUB_STEP_SUMMARY
- name: Fail if critical jobs failed
if: |
(needs.unit-tests.result == 'failure' && !inputs.skip_tests) ||
needs.docker-build.result == 'failure'
run: exit 1

View File

@@ -1,89 +0,0 @@
name: Docker Build and Push (Multi-architecture)
on:
push:
branches: [ main, experimental ]
tags: [ 'v*.*.*' ]
workflow_dispatch:
inputs:
image_tag:
description: 'Custom tag for the Docker image'
required: true
default: 'latest'
type: string
jobs:
build-and-push:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
with:
platforms: arm64
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
with:
platforms: linux/amd64,linux/arm64
driver-opts: |
image=moby/buildkit:latest
- name: Log in to registry
uses: docker/login-action@v2
with:
registry: ${{ vars.REGISTRY }}
username: ${{ secrets.REGISTRY_USERNAME }}
password: ${{ secrets.REGISTRY_PASSWORD }}
- name: Generate Docker image tags
id: tags
run: |
# Base image name
IMAGE_NAME="${{ vars.REGISTRY }}/${{ github.repository_owner }}/${{ vars.IMAGE_NAME }}"
# Determine primary tag
if [ -n "${{ github.event.inputs.image_tag }}" ]; then
PRIMARY_TAG="${{ github.event.inputs.image_tag }}"
elif [[ ${{ github.ref }} == refs/tags/v* ]]; then
PRIMARY_TAG="${GITHUB_REF#refs/tags/}"
elif [[ ${{ github.ref }} == refs/heads/main ]]; then
PRIMARY_TAG="latest"
elif [[ ${{ github.ref }} == refs/heads/experimental ]]; then
PRIMARY_TAG="experimental"
else
PRIMARY_TAG="latest"
fi
# Start with primary tag and SHA tag
TAGS="$IMAGE_NAME:$PRIMARY_TAG,$IMAGE_NAME:${{ github.sha }}"
# Add version tags for releases
if [[ ${{ github.ref }} == refs/tags/v* ]]; then
# Add major.minor tag (e.g., v1.2 for v1.2.3)
MAJOR_MINOR_TAG=$(echo "$PRIMARY_TAG" | sed -E 's/^v([0-9]+\.[0-9]+)\.[0-9]+.*$/v\1/')
if [[ "$MAJOR_MINOR_TAG" != "$PRIMARY_TAG" ]]; then
TAGS="$TAGS,$IMAGE_NAME:$MAJOR_MINOR_TAG"
fi
# Add major tag (e.g., v1 for v1.2.3)
MAJOR_TAG=$(echo "$PRIMARY_TAG" | sed -E 's/^v([0-9]+)\.[0-9]+\.[0-9]+.*$/v\1/')
if [[ "$MAJOR_TAG" != "$PRIMARY_TAG" ]]; then
TAGS="$TAGS,$IMAGE_NAME:$MAJOR_TAG"
fi
fi
echo "tags=$TAGS" >> $GITHUB_OUTPUT
echo "Generated tags: $TAGS"
- name: Build and push multi-arch image
uses: docker/build-push-action@v4
with:
context: .
platforms: linux/amd64,linux/arm64
push: true
cache-from: type=registry,ref=${{ vars.REGISTRY }}/${{ github.repository_owner }}/${{ vars.IMAGE_NAME }}:cache
cache-to: type=registry,ref=${{ vars.REGISTRY }}/${{ github.repository_owner }}/${{ vars.IMAGE_NAME }}:cache,mode=max
tags: ${{ steps.tags.outputs.tags }}

76
.gitignore vendored
View File

@@ -4,6 +4,14 @@ __pycache__/
*.py[cod]
*$py.class
# IDEs
.vscode/
.idea/
*.swp
*.swo
*~
.DS_Store
# C extensions
*.so
@@ -37,20 +45,33 @@ MANIFEST
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
# Testing
__pycache__/
*.py[cod]
*$py.class
*.so
.pytest_cache/
.coverage
.coverage.*
.cache
nosetests.xml
htmlcov/
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
.tox/
.nox/
# Test reports
test-results*.xml
junit*.xml
*-report.txt
*-report.json
bandit-report.json
safety-report.json
pip-audit-report.json
flake8-report.txt
pylint-report.txt
black-report.txt
mypy-report.txt
# Translations
*.mo
@@ -83,37 +104,7 @@ target/
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
@@ -161,13 +152,6 @@ dmypy.json
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Ruff stuff:
.ruff_cache/

View File

@@ -18,13 +18,13 @@ COPY requirements.txt .
RUN --mount=type=cache,target=/root/.cache/pip \
pip install --no-cache-dir -r requirements.txt
# Final stage - using smaller base image
FROM python:3.11-alpine3.18
# Final stage - using slim
FROM python:3.11-slim
# Install minimal runtime dependencies
RUN apk add --no-cache \
RUN apt-get update && apt-get install -y --no-install-recommends \
tini \
&& rm -rf /var/cache/apk/*
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /app
@@ -34,18 +34,22 @@ COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Create a non-root user
RUN addgroup -S bot && adduser -S bot -G bot
RUN groupadd -r bot && useradd -r -g bot bot
# Copy necessary files
COPY --chown=bot:bot *.py ./
COPY --chown=bot:bot entrypoint.sh ./
# Add other necessary directories/files as needed
# Create directories for persistent storage
RUN mkdir -p logs embed && \
chown -R bot:bot /app logs embed && \
chmod -R 777 /app logs embed
# Create and set permissions for matplotlib config directory
RUN mkdir -p /tmp/matplotlib && \
chown -R bot:bot /tmp/matplotlib && \
chmod -R 777 /tmp/matplotlib
# Switch to non root user
USER bot
@@ -54,6 +58,8 @@ ENV PYTHONUNBUFFERED=1
ENV CONFIG_PATH=/app/config.ini
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONPYCACHEPREFIX=/tmp
ENV MPLCONFIGDIR=/tmp/matplotlib
ENV MPLBACKEND=Agg
# Run the bot using tini and entrypoint script
ENTRYPOINT ["tini", "--", "/bin/sh", "entrypoint.sh"]

File diff suppressed because it is too large Load Diff

View File

@@ -30,7 +30,13 @@ import configparser
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from pathlib import Path
import generate_config
import matplotlib
matplotlib.use('Agg') # Use non-interactive backend for server environments
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from collections import deque
import io
from server_metrics_graphs import ServerMetricsGraphs, ServerMetricsManager
# ==============================================
# LOGGING SETUP
@@ -549,10 +555,11 @@ class PterodactylBot(commands.Bot):
self.server_cache: Dict[str, dict] = {} # Cache of server data from Pterodactyl
self.embed_locations: Dict[str, Dict[str, int]] = {} # Tracks where embeds are posted
self.update_lock = asyncio.Lock() # Prevents concurrent updates
self.embed_storage_path = Path(EMBED_LOCATIONS_FILE) # File to store embed locations
self.embed_storage_path = Path(EMBED_LOCATIONS_FILE) # File to store embed
self.metrics_manager = ServerMetricsManager() # Data manager for metrics graphing system
# Track previous server states and CPU usage to detect changes
# Format: {server_id: (state, cpu_usage)}
self.previous_states: Dict[str, Tuple[str, float]] = {}
# Format: {server_id: (state, cpu_usage, last_force_update)}
self.previous_states: Dict[str, Tuple[str, float, Optional[float]]] = {}
logger.info("Initialized PterodactylBot instance with state tracking")
async def setup_hook(self):
@@ -750,25 +757,104 @@ class PterodactylBot(commands.Bot):
timestamp=datetime.now()
)
embed.add_field(name="Server ID", value=identifier, inline=True)
embed.add_field(name="🆔 Server ID", value=f"`{identifier}`", inline=True)
if is_suspended:
embed.add_field(name="Status", value="⛔ Suspended", inline=True)
embed.add_field(name=" Status", value="`Suspended`", inline=True)
else:
embed.add_field(name="Status", value="✅ Active", inline=True)
embed.add_field(name=" Status", value="`Active`", inline=True)
# Add resource usage if server is running
if current_state.lower() == "running":
if current_state.lower() != "offline":
# Current usage
cpu_usage = round(resource_attributes.get('resources', {}).get('cpu_absolute', 0), 2)
memory_usage = round(resource_attributes.get('resources', {}).get('memory_bytes', 0) / (1024 ** 2), 2)
disk_usage = round(resource_attributes.get('resources', {}).get('disk_bytes', 0) / (1024 ** 2), 2)
network_rx = round(resource_attributes.get('resources', {}).get('network_rx_bytes', 0) / (1024 ** 2), 2)
network_tx = round(resource_attributes.get('resources', {}).get('network_tx_bytes', 0) / (1024 ** 2), 2)
embed.add_field(name="CPU Usage", value=f"{cpu_usage}%", inline=True)
embed.add_field(name="Memory Usage", value=f"{memory_usage} MB", inline=True)
embed.add_field(name="Disk Usage", value=f"{disk_usage} MB", inline=True)
embed.add_field(name="Network", value=f"⬇️ {network_rx} MB / ⬆️ {network_tx} MB", inline=False)
# Maximum allocated resources from server data
limits = attributes.get('limits', {})
cpu_limit = limits.get('cpu', 0)
memory_limit = limits.get('memory', 0)
disk_limit = limits.get('disk', 0)
# Format limit values - display ∞ for unlimited (0 limit)
def format_limit(value, unit=""):
if value == 0:
return f"{'':<8}]{unit}" # Lemniscate symbol for infinity
else:
return f"{value:<8}]{unit}"
# Get uptime from Pterodactyl API (in milliseconds)
uptime_ms = resource_attributes.get('resources', {}).get('uptime', 0)
# Format uptime for display
if uptime_ms > 0:
uptime_seconds = uptime_ms // 1000 # Convert ms to seconds
if uptime_seconds < 60:
uptime_text = f"`{uptime_seconds}s`"
elif uptime_seconds < 3600:
uptime_text = f"`{uptime_seconds // 60}m {uptime_seconds % 60}s`"
elif uptime_seconds < 86400:
hours = uptime_seconds // 3600
minutes = (uptime_seconds % 3600) // 60
uptime_text = f"`{hours}h {minutes}m`"
else:
days = uptime_seconds // 86400
hours = (uptime_seconds % 86400) // 3600
uptime_text = f"`{days}d {hours}h`"
else:
uptime_text = "`Just started`"
embed.add_field(name="⏱️ Uptime", value=uptime_text, inline=True)
# Create dedicated usage text box with current usage and limits in monospace font
usage_text = (
f"```properties\n"
f"CPU : [{cpu_usage:>8} / {format_limit(cpu_limit, ' %')}\n"
f"Memory : [{memory_usage:>8} / {format_limit(memory_limit, ' MiB')}\n"
f"Disk : [{disk_usage:>8} / {format_limit(disk_limit, ' MiB')}\n"
f"```"
)
embed.add_field(
name="📊 Resource Usage",
value=usage_text,
inline=False
)
embed.add_field(
name="Network In",
value=f"📥 `{network_rx} MiB`",
inline=True
)
embed.add_field(
name="Network Out",
value=f"📤 `{network_tx} MiB`",
inline=True
)
# Add graph images if available
server_graphs = self.metrics_manager.get_server_graphs(identifier)
if server_graphs and server_graphs.has_sufficient_data:
summary = server_graphs.get_data_summary()
graph_description = (
f">>> `Data points: {summary['point_count']}/6`\n"
f"`CPU trend: {summary['cpu_trend']} • Memory trend: {summary['memory_trend']}`"
)
# Add a field explaining the graphs
embed.add_field(
name="📈 Usage Trends (Last Minute)",
value=graph_description,
inline=False
)
# Set graph images (these will be attached as files in the update_status method)
embed.set_image(url=f"attachment://metrics_graph_{identifier}.png")
embed.set_footer(text="Last updated")
@@ -790,6 +876,7 @@ class PterodactylBot(commands.Bot):
1. Server power state changes (started/stopped/restarted)
2. Significant CPU usage change (>50% difference)
3. First time seeing the server
4. Server has been running for 10 minutes (force update for uptime)
This minimizes API calls to Discord and updates while maintaining
real-time awareness of important server changes.
@@ -806,12 +893,17 @@ class PterodactylBot(commands.Bot):
# Update our local cache with fresh server data
self.server_cache = {server['attributes']['identifier']: server for server in servers}
logger.debug(f"Updated server cache with {len(servers)} servers")
# Clean up metrics for servers that no longer exist
active_server_ids = list(self.server_cache.keys())
self.metrics_manager.cleanup_old_servers(active_server_ids)
# Variables to track our update statistics
update_count = 0 # Successful updates
error_count = 0 # Failed updates
missing_count = 0 # Missing embeds
skipped_count = 0 # Servers that didn't need updates
current_time = datetime.now().timestamp()
# Process each server we're tracking embeds for
for server_id, location in list(self.embed_locations.items()):
@@ -830,9 +922,15 @@ class PterodactylBot(commands.Bot):
resources = await self.pterodactyl_api.get_server_resources(server_id)
current_state = resources.get('attributes', {}).get('current_state', 'offline')
cpu_usage = round(resources.get('attributes', {}).get('resources', {}).get('cpu_absolute', 0), 2)
# Retrieve previous recorded state and CPU usage
prev_state, prev_cpu = self.previous_states.get(server_id, (None, 0))
# Collect metrics data for running servers
if current_state == 'running':
memory_usage = round(resources.get('attributes', {}).get('resources', {}).get('memory_bytes', 0) / (1024 ** 2), 2)
self.metrics_manager.add_server_data(server_id, server_name, cpu_usage, memory_usage)
logger.debug(f"Added metrics data for {server_name}: CPU={cpu_usage}%, Memory={memory_usage}MB")
# Retrieve previous recorded state, CPU usage, and last force update time
prev_state, prev_cpu, last_force_update = self.previous_states.get(server_id, (None, 0, None))
# DECISION LOGIC: Should we update the embed?
needs_update = False
@@ -852,6 +950,15 @@ class PterodactylBot(commands.Bot):
logger.debug(f"First check for {server_name}, performing initial update")
needs_update = True
# 4. Force update every 10 minutes for running servers (for uptime counter)
elif (current_state == 'running' and
(last_force_update is None or
current_time - last_force_update >= 600)): # 10 minutes = 600 seconds
logger.debug(f"Executing 10-minute force update for running server {server_name}")
needs_update = True
# Update the last force update time
last_force_update = current_time
# PERFORM UPDATE IF NEEDED
if needs_update:
# Generate fresh embed and view components
@@ -865,14 +972,38 @@ class PterodactylBot(commands.Bot):
# Fetch and update the existing message
message = await channel.fetch_message(int(location['message_id']))
await message.edit(embed=embed, view=view)
# Check if server is transitioning to offline/stopping state
# and remove image attachment if present
files = []
server_graphs = self.metrics_manager.get_server_graphs(server_id)
# Only include graph images if server is running AND has sufficient data
if (current_state == 'running' and
server_graphs and
server_graphs.has_sufficient_data):
# Generate metrics graph
combined_graph = server_graphs.generate_combined_graph()
if combined_graph:
files.append(discord.File(combined_graph, filename=f"metrics_graph_{server_id}.png"))
logger.debug(f"Including metrics graph for running server {server_name}")
else:
# Server is offline/stopping - ensure no image is attached
logger.debug(f"Server {server_name} is {current_state}, removing image attachment if present")
# We'll update without files to remove any existing attachments
# Update message with embed, view, and files (empty files list removes attachments)
await message.edit(embed=embed, view=view, attachments=files)
update_count += 1
logger.debug(f"Updated status for {server_name}")
# Update our state tracking with new values
self.previous_states[server_id] = (current_state, cpu_usage)
# Only update last_force_update if this was a force update
new_last_force_update = last_force_update if needs_update and current_state == 'running' and current_time - (last_force_update or 0) >= 600 else (last_force_update if last_force_update is not None else None)
self.previous_states[server_id] = (current_state, cpu_usage, new_last_force_update)
else:
# No significant changes detected
# No significant changes detected, but update tracking with current state
self.previous_states[server_id] = (current_state, cpu_usage, last_force_update)
skipped_count += 1
logger.debug(f"No changes detected for {server_name}, skipping update")
@@ -1436,4 +1567,4 @@ if __name__ == "__main__":
sys.exit(1) # Exit with error code for crash
finally:
logger.info("Bot shutdown complete")
sys.exit(0) # Explicit clean exit
sys.exit(0) # Explicit clean exit

26
requirements-test.txt Normal file
View File

@@ -0,0 +1,26 @@
# Testing Dependencies for Pterodactyl Discord Bot
# Core testing framework
pytest>=7.4.0
pytest-asyncio>=0.21.0
pytest-cov>=4.1.0
pytest-mock>=3.11.1
pytest-timeout>=2.1.0
# Code quality and linting
flake8>=6.0.0
pylint>=2.17.0
black>=23.7.0
isort>=5.12.0
# Security scanning
bandit>=1.7.5
safety>=2.3.5
# Mocking and fixtures
pytest-fixtures>=0.1.0
freezegun>=1.2.2
# Coverage reporting
coverage>=7.2.7
coverage-badge>=1.1.0

View File

@@ -1,4 +1,5 @@
discord.py>=2.3.0
aiohttp>=3.8.0
configparser>=5.3.0
python-dotenv
python-dotenv
matplotlib

472
server_metrics_graphs.py Normal file
View File

@@ -0,0 +1,472 @@
"""
Server Metrics Graphs Module for Pterodactyl Discord Bot
This module provides graphing capabilities for server CPU and memory usage.
Generates line graphs as PNG images for embedding in Discord messages.
"""
import matplotlib
matplotlib.use('Agg') # Use non-interactive backend for server environments
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from collections import deque
from datetime import datetime, timedelta
from typing import Dict, Tuple, Optional
import io
import logging
import math
# Get the logger from the main bot module
logger = logging.getLogger('pterodisbot')
class ServerMetricsGraphs:
"""
Manages CPU and memory usage graphs for individual servers.
Features:
- Stores last 6 data points (1 minute of history at 10-second intervals)
- Generates PNG images of line graphs for Discord embedding
- Automatic data rotation (FIFO queue with max 6 points)
- Separate tracking for CPU percentage and memory MB usage
- Dynamic CPU scaling in 100% increments for multi-vCPU servers
- Clean graph styling optimized for Discord dark theme
"""
def __init__(self, server_id: str, server_name: str):
"""
Initialize metrics tracking for a server.
Args:
server_id: Pterodactyl server identifier
server_name: Human-readable server name
"""
self.server_id = server_id
self.server_name = server_name
# Use deque with maxlen=6 for automatic FIFO rotation
# Each entry is a tuple: (timestamp, cpu_percent, memory_mb)
self.data_points = deque(maxlen=6)
# Track if we have enough data for meaningful graphs (at least 2 points)
self.has_sufficient_data = False
logger.debug(f"Initialized metrics tracking for server {server_name} ({server_id})")
def add_data_point(self, cpu_percent: float, memory_mb: float, timestamp: Optional[datetime] = None):
"""
Add a new data point to the metrics history.
Args:
cpu_percent: Current CPU usage percentage
memory_mb: Current memory usage in megabytes
timestamp: Optional timestamp, defaults to current time
"""
if timestamp is None:
timestamp = datetime.now()
# Add new data point (automatically rotates old data due to maxlen=6)
self.data_points.append((timestamp, cpu_percent, memory_mb))
# Update sufficient data flag
self.has_sufficient_data = len(self.data_points) >= 2
logger.debug(f"Added metrics data point for {self.server_name}: CPU={cpu_percent}%, Memory={memory_mb}MB")
def _calculate_cpu_scale_limit(self, max_cpu_value: float) -> int:
"""
Calculate appropriate CPU scale limit in 100% increments.
Args:
max_cpu_value: Maximum CPU value in the dataset
Returns:
Scale limit rounded up to nearest 100% increment
"""
if max_cpu_value <= 100:
return 100
# Round up to nearest 100% increment
# e.g., 150% -> 200%, 250% -> 300%, 350% -> 400%
return math.ceil(max_cpu_value / 100) * 100
def generate_cpu_graph(self) -> Optional[io.BytesIO]:
"""
Generate a CPU usage line graph as a PNG image.
Returns:
BytesIO object containing PNG image data, or None if insufficient data
"""
if not self.has_sufficient_data:
logger.debug(f"Insufficient data for CPU graph generation: {self.server_name}")
return None
try:
# Extract timestamps and CPU data
timestamps = [point[0] for point in self.data_points]
cpu_values = [point[1] for point in self.data_points]
# Calculate dynamic CPU scale limit
max_cpu = max(cpu_values)
cpu_scale_limit = self._calculate_cpu_scale_limit(max_cpu)
# Create figure with dark theme styling
plt.style.use('dark_background')
fig, ax = plt.subplots(figsize=(8, 4), dpi=100)
fig.patch.set_facecolor('#2f3136') # Discord dark theme background
ax.set_facecolor('#36393f') # Slightly lighter for graph area
# Plot CPU line with gradient fill
line = ax.plot(timestamps, cpu_values, color='#7289da', linewidth=2.5, marker='o', markersize=4)
ax.fill_between(timestamps, cpu_values, alpha=0.3, color='#7289da')
# Customize axes with dynamic scaling
ax.set_ylabel('CPU Usage (%)', color='#ffffff', fontsize=10)
ax.set_ylim(0, cpu_scale_limit)
# Add horizontal grid lines at 100% increments for better readability
for i in range(100, cpu_scale_limit + 1, 100):
ax.axhline(y=i, color='#ffffff', alpha=0.2, linestyle='--', linewidth=0.8)
# Format time axis
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))
ax.xaxis.set_major_locator(mdates.SecondLocator(interval=20))
plt.setp(ax.xaxis.get_majorticklabels(), rotation=45, ha='right', color='#ffffff', fontsize=8)
# Style the graph
ax.tick_params(colors='#ffffff', labelsize=8)
ax.grid(True, alpha=0.3, color='#ffffff')
ax.spines['bottom'].set_color('#ffffff')
ax.spines['left'].set_color('#ffffff')
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
# Add title with scale info for multi-vCPU servers
title = f'{self.server_name} - CPU Usage'
if cpu_scale_limit > 100:
estimated_vcpus = cpu_scale_limit // 100
title += f' (~{estimated_vcpus} vCPU cores)'
ax.set_title(title, color='#ffffff', fontsize=12, pad=20)
# Tight layout to prevent label cutoff
plt.tight_layout()
# Save to BytesIO
img_buffer = io.BytesIO()
plt.savefig(img_buffer, format='png', facecolor='#2f3136', edgecolor='none',
bbox_inches='tight', dpi=100)
img_buffer.seek(0)
# Clean up matplotlib resources
plt.close(fig)
logger.debug(f"Generated CPU graph for {self.server_name} (scale: 0-{cpu_scale_limit}%)")
return img_buffer
except Exception as e:
logger.error(f"Failed to generate CPU graph for {self.server_name}: {str(e)}")
plt.close('all') # Clean up any remaining figures
return None
def generate_memory_graph(self) -> Optional[io.BytesIO]:
"""
Generate a memory usage line graph as a PNG image.
Returns:
BytesIO object containing PNG image data, or None if insufficient data
"""
if not self.has_sufficient_data:
logger.debug(f"Insufficient data for memory graph generation: {self.server_name}")
return None
try:
# Extract timestamps and memory data
timestamps = [point[0] for point in self.data_points]
memory_values = [point[2] for point in self.data_points]
# Create figure with dark theme styling
plt.style.use('dark_background')
fig, ax = plt.subplots(figsize=(8, 4), dpi=100)
fig.patch.set_facecolor('#2f3136') # Discord dark theme background
ax.set_facecolor('#36393f') # Slightly lighter for graph area
# Plot memory line with gradient fill
line = ax.plot(timestamps, memory_values, color='#43b581', linewidth=2.5, marker='o', markersize=4)
ax.fill_between(timestamps, memory_values, alpha=0.3, color='#43b581')
# Customize axes
ax.set_ylabel('Memory Usage (MB)', color='#ffffff', fontsize=10)
ax.set_ylim(0, max(memory_values) * 1.1) # Dynamic scaling with 10% padding
# Format time axis
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))
ax.xaxis.set_major_locator(mdates.SecondLocator(interval=20))
plt.setp(ax.xaxis.get_majorticklabels(), rotation=45, ha='right', color='#ffffff', fontsize=8)
# Style the graph
ax.tick_params(colors='#ffffff', labelsize=8)
ax.grid(True, alpha=0.3, color='#ffffff')
ax.spines['bottom'].set_color('#ffffff')
ax.spines['left'].set_color('#ffffff')
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
# Add title
ax.set_title(f'{self.server_name} - Memory Usage', color='#ffffff', fontsize=12, pad=20)
# Tight layout to prevent label cutoff
plt.tight_layout()
# Save to BytesIO
img_buffer = io.BytesIO()
plt.savefig(img_buffer, format='png', facecolor='#2f3136', edgecolor='none',
bbox_inches='tight', dpi=100)
img_buffer.seek(0)
# Clean up matplotlib resources
plt.close(fig)
logger.debug(f"Generated memory graph for {self.server_name}")
return img_buffer
except Exception as e:
logger.error(f"Failed to generate memory graph for {self.server_name}: {str(e)}")
plt.close('all') # Clean up any remaining figures
return None
def generate_combined_graph(self) -> Optional[io.BytesIO]:
"""
Generate a combined CPU and memory usage graph as a PNG image.
Returns:
BytesIO object containing PNG image data, or None if insufficient data
"""
if not self.has_sufficient_data:
logger.debug(f"Insufficient data for combined graph generation: {self.server_name}")
return None
try:
# Extract data
timestamps = [point[0] for point in self.data_points]
cpu_values = [point[1] for point in self.data_points]
memory_values = [point[2] for point in self.data_points]
# Calculate dynamic CPU scale limit
max_cpu = max(cpu_values)
cpu_scale_limit = self._calculate_cpu_scale_limit(max_cpu)
# Create figure with two subplots
plt.style.use('dark_background')
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(8, 6), dpi=100, sharex=True)
fig.patch.set_facecolor('#2f3136')
# CPU subplot
ax1.set_facecolor('#36393f')
ax1.plot(timestamps, cpu_values, color='#7289da', linewidth=2.5, marker='o', markersize=4)
ax1.fill_between(timestamps, cpu_values, alpha=0.3, color='#7289da')
ax1.set_ylabel('CPU Usage (%)', color='#ffffff', fontsize=10)
ax1.set_ylim(0, cpu_scale_limit)
ax1.tick_params(colors='#ffffff', labelsize=8)
ax1.grid(True, alpha=0.3, color='#ffffff')
# Add horizontal grid lines at 100% increments for CPU subplot
for i in range(100, cpu_scale_limit + 1, 100):
ax1.axhline(y=i, color='#ffffff', alpha=0.2, linestyle='--', linewidth=0.8)
# Title with vCPU info if applicable
title = f'{self.server_name} - Resource Usage'
if cpu_scale_limit > 100:
estimated_vcpus = cpu_scale_limit // 100
title += f' (~{estimated_vcpus} vCPU cores)'
ax1.set_title(title, color='#ffffff', fontsize=12)
# Memory subplot
ax2.set_facecolor('#36393f')
ax2.plot(timestamps, memory_values, color='#43b581', linewidth=2.5, marker='o', markersize=4)
ax2.fill_between(timestamps, memory_values, alpha=0.3, color='#43b581')
ax2.set_ylabel('Memory (MB)', color='#ffffff', fontsize=10)
ax2.set_ylim(0, max(memory_values) * 1.1)
ax2.tick_params(colors='#ffffff', labelsize=8)
ax2.grid(True, alpha=0.3, color='#ffffff')
# Format time axis (only on bottom subplot)
ax2.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))
ax2.xaxis.set_major_locator(mdates.SecondLocator(interval=20))
plt.setp(ax2.xaxis.get_majorticklabels(), rotation=45, ha='right', color='#ffffff', fontsize=8)
# Style both subplots
for ax in [ax1, ax2]:
ax.spines['bottom'].set_color('#ffffff')
ax.spines['left'].set_color('#ffffff')
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
plt.tight_layout()
# Save to BytesIO
img_buffer = io.BytesIO()
plt.savefig(img_buffer, format='png', facecolor='#2f3136', edgecolor='none',
bbox_inches='tight', dpi=100)
img_buffer.seek(0)
plt.close(fig)
logger.debug(f"Generated combined graph for {self.server_name} (CPU scale: 0-{cpu_scale_limit}%)")
return img_buffer
except Exception as e:
logger.error(f"Failed to generate combined graph for {self.server_name}: {str(e)}")
plt.close('all')
return None
def get_data_summary(self) -> Dict[str, any]:
"""
Get summary statistics for the current data points.
Returns:
Dictionary containing data point count, latest values, and trends
"""
if not self.data_points:
return {
'point_count': 0,
'has_data': False,
'latest_cpu': 0,
'latest_memory': 0
}
# Get latest values
latest_point = self.data_points[-1]
latest_cpu = latest_point[1]
latest_memory = latest_point[2]
# Calculate CPU scale info
max_cpu = max(point[1] for point in self.data_points)
cpu_scale_limit = self._calculate_cpu_scale_limit(max_cpu)
estimated_vcpus = cpu_scale_limit // 100
# Calculate trends if we have multiple points
cpu_trend = 'stable'
memory_trend = 'stable'
if len(self.data_points) >= 2:
first_point = self.data_points[0]
cpu_change = latest_cpu - first_point[1]
memory_change = latest_memory - first_point[2]
# Determine trends (>5% change considered significant)
if abs(cpu_change) > 5:
cpu_trend = 'increasing' if cpu_change > 0 else 'decreasing'
if abs(memory_change) > 50: # 50MB change threshold
memory_trend = 'increasing' if memory_change > 0 else 'decreasing'
return {
'point_count': len(self.data_points),
'has_data': self.has_sufficient_data,
'latest_cpu': latest_cpu,
'latest_memory': latest_memory,
'cpu_trend': cpu_trend,
'memory_trend': memory_trend,
'cpu_scale_limit': cpu_scale_limit,
'estimated_vcpus': estimated_vcpus,
'time_span_minutes': len(self.data_points) * 10 / 60 # Convert to minutes
}
class ServerMetricsManager:
"""
Global manager for all server metrics graphs.
Handles:
- Creation and cleanup of ServerMetricsGraphs instances
- Bulk operations across all tracked servers
- Memory management for graph storage
"""
def __init__(self):
"""Initialize the metrics manager."""
self.server_graphs: Dict[str, ServerMetricsGraphs] = {}
logger.info("Initialized ServerMetricsManager")
def get_or_create_server_graphs(self, server_id: str, server_name: str) -> ServerMetricsGraphs:
"""
Get existing ServerMetricsGraphs instance or create a new one.
Args:
server_id: Pterodactyl server identifier
server_name: Human-readable server name
Returns:
ServerMetricsGraphs instance for the specified server
"""
if server_id not in self.server_graphs:
self.server_graphs[server_id] = ServerMetricsGraphs(server_id, server_name)
logger.debug(f"Created new metrics graphs for server {server_name}")
return self.server_graphs[server_id]
def add_server_data(self, server_id: str, server_name: str, cpu_percent: float, memory_mb: float):
"""
Add data point to a server's metrics tracking.
Args:
server_id: Pterodactyl server identifier
server_name: Human-readable server name
cpu_percent: Current CPU usage percentage
memory_mb: Current memory usage in megabytes
"""
graphs = self.get_or_create_server_graphs(server_id, server_name)
graphs.add_data_point(cpu_percent, memory_mb)
def remove_server(self, server_id: str):
"""
Remove a server from metrics tracking.
Args:
server_id: Pterodactyl server identifier to remove
"""
if server_id in self.server_graphs:
del self.server_graphs[server_id]
logger.debug(f"Removed metrics tracking for server {server_id}")
def get_server_graphs(self, server_id: str) -> Optional[ServerMetricsGraphs]:
"""
Get ServerMetricsGraphs instance for a specific server.
Args:
server_id: Pterodactyl server identifier
Returns:
ServerMetricsGraphs instance or None if not found
"""
return self.server_graphs.get(server_id)
def cleanup_old_servers(self, active_server_ids: list):
"""
Remove tracking for servers that no longer exist.
Args:
active_server_ids: List of currently active server IDs
"""
servers_to_remove = []
for server_id in self.server_graphs:
if server_id not in active_server_ids:
servers_to_remove.append(server_id)
for server_id in servers_to_remove:
self.remove_server(server_id)
if servers_to_remove:
logger.info(f"Cleaned up metrics for {len(servers_to_remove)} inactive servers")
def get_summary(self) -> Dict[str, any]:
"""
Get summary of all tracked servers.
Returns:
Dictionary with tracking statistics
"""
return {
'total_servers': len(self.server_graphs),
'servers_with_data': sum(1 for graphs in self.server_graphs.values() if graphs.has_sufficient_data),
'total_data_points': sum(len(graphs.data_points) for graphs in self.server_graphs.values())
}

797
test_pterodisbot.py Normal file
View File

@@ -0,0 +1,797 @@
"""
Unit and Integration Tests for Pterodactyl Discord Bot
Test coverage:
- Configuration validation
- Pterodactyl API client operations
- Discord bot commands and interactions
- Server metrics tracking
- Embed management
- Error handling
"""
import pytest
import asyncio
import json
import os
from unittest.mock import Mock, AsyncMock, patch, MagicMock
from datetime import datetime
import configparser
import discord
from discord.ext import commands
import aiohttp
# Import the modules to test
import sys
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from pterodisbot import (
PterodactylAPI,
ServerStatusView,
PterodactylBot,
ConfigValidationError,
validate_config,
REQUIRED_ROLE
)
from server_metrics_graphs import ServerMetricsGraphs, ServerMetricsManager
# ==========================================
# FIXTURES
# ==========================================
@pytest.fixture
def mock_config():
"""
Create a mock configuration for testing.
Returns:
ConfigParser: A properly configured test configuration object
"""
config = configparser.ConfigParser()
config['Pterodactyl'] = {
'PanelURL': 'https://panel.example.com',
'ClientAPIKey': 'ptlc_test_client_key_123',
'ApplicationAPIKey': 'ptla_test_app_key_456'
}
config['Discord'] = {
'Token': 'test_discord_token',
'AllowedGuildID': '123456789'
}
return config
@pytest.fixture
def mock_pterodactyl_api():
"""
Create a mock PterodactylAPI instance with properly configured session.
Returns:
PterodactylAPI: A mocked API instance ready for testing
"""
api = PterodactylAPI(
'https://panel.example.com',
'ptlc_test_client_key',
'ptla_test_app_key'
)
# Create a proper async mock session
api.session = AsyncMock(spec=aiohttp.ClientSession)
api.session.close = AsyncMock() # Ensure close is an async mock
return api
@pytest.fixture
def sample_server_data():
"""
Sample server data from Pterodactyl API.
Returns:
dict: Server attributes in Pterodactyl API format
"""
return {
'attributes': {
'identifier': 'abc123',
'name': 'Test Server',
'description': 'A test game server',
'suspended': False,
'limits': {
'cpu': 200,
'memory': 2048,
'disk': 10240
}
}
}
@pytest.fixture
def sample_resources_data():
"""
Sample resource usage data from Pterodactyl API.
Returns:
dict: Resource usage attributes in Pterodactyl API format
"""
return {
'attributes': {
'current_state': 'running',
'resources': {
'cpu_absolute': 45.5,
'memory_bytes': 1073741824, # 1GB
'disk_bytes': 5368709120, # 5GB
'network_rx_bytes': 10485760, # 10MB
'network_tx_bytes': 5242880, # 5MB
'uptime': 3600000 # 1 hour in milliseconds
}
}
}
@pytest.fixture
def mock_discord_interaction():
"""
Create a mock Discord interaction with properly configured user roles.
Returns:
AsyncMock: A mocked Discord interaction object
"""
interaction = AsyncMock(spec=discord.Interaction)
interaction.user = Mock()
interaction.user.name = 'TestUser'
# Create mock role with proper name attribute
mock_role = Mock()
mock_role.name = REQUIRED_ROLE
interaction.user.roles = [mock_role]
interaction.guild_id = 123456789
interaction.channel = Mock()
interaction.channel.id = 987654321
interaction.response = AsyncMock()
interaction.followup = AsyncMock()
return interaction
# ==========================================
# CONFIGURATION VALIDATION TESTS
# ==========================================
class TestConfigValidation:
"""Test configuration validation logic."""
def test_valid_config(self, mock_config, monkeypatch):
"""
Test that valid configuration passes validation.
Args:
mock_config: Pytest fixture providing valid config
monkeypatch: Pytest monkeypatch fixture for patching
"""
monkeypatch.setattr('pterodisbot.config', mock_config)
# Should not raise any exceptions
try:
validate_config()
except ConfigValidationError:
pytest.fail("Valid configuration should not raise ConfigValidationError")
def test_missing_pterodactyl_section(self, monkeypatch):
"""
Test validation fails with missing Pterodactyl section.
Args:
monkeypatch: Pytest monkeypatch fixture for patching
"""
config = configparser.ConfigParser()
config['Discord'] = {
'Token': 'test_token',
'AllowedGuildID': '123456789'
}
monkeypatch.setattr('pterodisbot.config', config)
with pytest.raises(ConfigValidationError, match="Missing \\[Pterodactyl\\] section"):
validate_config()
def test_invalid_api_key_prefix(self, mock_config, monkeypatch):
"""
Test validation fails with incorrect API key prefix.
Args:
mock_config: Pytest fixture providing config
monkeypatch: Pytest monkeypatch fixture for patching
"""
mock_config['Pterodactyl']['ClientAPIKey'] = 'invalid_prefix_key'
monkeypatch.setattr('pterodisbot.config', mock_config)
with pytest.raises(ConfigValidationError, match="ClientAPIKey should start with 'ptlc_'"):
validate_config()
def test_invalid_guild_id(self, mock_config, monkeypatch):
"""
Test validation fails with invalid guild ID.
Args:
mock_config: Pytest fixture providing config
monkeypatch: Pytest monkeypatch fixture for patching
"""
mock_config['Discord']['AllowedGuildID'] = 'not_a_number'
monkeypatch.setattr('pterodisbot.config', mock_config)
with pytest.raises(ConfigValidationError, match="AllowedGuildID must be a valid integer"):
validate_config()
def test_invalid_panel_url(self, mock_config, monkeypatch):
"""
Test validation fails with invalid panel URL.
Args:
mock_config: Pytest fixture providing config
monkeypatch: Pytest monkeypatch fixture for patching
"""
mock_config['Pterodactyl']['PanelURL'] = 'not-a-url'
monkeypatch.setattr('pterodisbot.config', mock_config)
with pytest.raises(ConfigValidationError, match="PanelURL must start with http"):
validate_config()
# ==========================================
# PTERODACTYL API TESTS
# ==========================================
class TestPterodactylAPI:
"""Test Pterodactyl API client functionality."""
@pytest.mark.asyncio
async def test_initialize(self):
"""
Test API client initialization.
Verifies that the API client properly creates an aiohttp session
"""
api = PterodactylAPI('https://panel.example.com', 'ptlc_key', 'ptla_key')
await api.initialize()
assert api.session is not None
assert isinstance(api.session, aiohttp.ClientSession)
await api.close()
@pytest.mark.asyncio
async def test_close(self, mock_pterodactyl_api):
"""
Test API client cleanup properly calls session.close().
Args:
mock_pterodactyl_api: Pytest fixture providing mocked API instance
"""
# Ensure the session is marked as not closed
mock_pterodactyl_api.session.closed = False
await mock_pterodactyl_api.close()
# Verify close was called once
mock_pterodactyl_api.session.close.assert_called_once()
@pytest.mark.asyncio
async def test_request_success(self, mock_pterodactyl_api):
"""
Test successful API request with properly mocked context manager.
Args:
mock_pterodactyl_api: Pytest fixture providing mocked API instance
"""
# Create a mock response
mock_response = AsyncMock()
mock_response.status = 200
mock_response.json = AsyncMock(return_value={'data': 'test'})
# Create a mock context manager that returns the response
mock_context = AsyncMock()
mock_context.__aenter__.return_value = mock_response
mock_context.__aexit__.return_value = AsyncMock()
# Configure the session.request to return the context manager
mock_pterodactyl_api.session.request = Mock(return_value=mock_context)
result = await mock_pterodactyl_api._request('GET', 'test/endpoint')
assert result == {'data': 'test'}
mock_pterodactyl_api.session.request.assert_called_once()
@pytest.mark.asyncio
async def test_request_error(self, mock_pterodactyl_api):
"""
Test API request error handling with properly mocked context manager.
Args:
mock_pterodactyl_api: Pytest fixture providing mocked API instance
"""
# Create a mock error response
mock_response = AsyncMock()
mock_response.status = 404
mock_response.json = AsyncMock(return_value={
'errors': [{'detail': 'Server not found'}]
})
# Create a mock context manager that returns the error response
mock_context = AsyncMock()
mock_context.__aenter__.return_value = mock_response
mock_context.__aexit__.return_value = AsyncMock()
# Configure the session.request to return the context manager
mock_pterodactyl_api.session.request = Mock(return_value=mock_context)
result = await mock_pterodactyl_api._request('GET', 'test/endpoint')
assert result['status'] == 'error'
assert 'Server not found' in result['message']
@pytest.mark.asyncio
async def test_get_servers(self, mock_pterodactyl_api, sample_server_data):
"""
Test retrieving server list from API.
Args:
mock_pterodactyl_api: Pytest fixture providing mocked API instance
sample_server_data: Pytest fixture providing sample server data
"""
mock_pterodactyl_api._request = AsyncMock(return_value={
'data': [sample_server_data]
})
servers = await mock_pterodactyl_api.get_servers()
assert len(servers) == 1
assert servers[0] == sample_server_data
mock_pterodactyl_api._request.assert_called_once_with(
'GET', 'application/servers', use_application_key=True
)
@pytest.mark.asyncio
async def test_get_server_resources(self, mock_pterodactyl_api, sample_resources_data):
"""
Test retrieving server resource usage from API.
Args:
mock_pterodactyl_api: Pytest fixture providing mocked API instance
sample_resources_data: Pytest fixture providing sample resource data
"""
mock_pterodactyl_api._request = AsyncMock(return_value=sample_resources_data)
resources = await mock_pterodactyl_api.get_server_resources('abc123')
assert resources['attributes']['current_state'] == 'running'
mock_pterodactyl_api._request.assert_called_once_with(
'GET', 'client/servers/abc123/resources'
)
@pytest.mark.asyncio
async def test_send_power_action_valid(self, mock_pterodactyl_api):
"""
Test sending valid power action to server.
Args:
mock_pterodactyl_api: Pytest fixture providing mocked API instance
"""
mock_pterodactyl_api._request = AsyncMock(return_value={'status': 'success'})
result = await mock_pterodactyl_api.send_power_action('abc123', 'start')
assert result['status'] == 'success'
mock_pterodactyl_api._request.assert_called_once_with(
'POST', 'client/servers/abc123/power', {'signal': 'start'}
)
@pytest.mark.asyncio
async def test_send_power_action_invalid(self, mock_pterodactyl_api):
"""
Test sending invalid power action returns error.
Args:
mock_pterodactyl_api: Pytest fixture providing mocked API instance
"""
result = await mock_pterodactyl_api.send_power_action('abc123', 'invalid_action')
assert result['status'] == 'error'
assert 'Invalid action' in result['message']
# ==========================================
# SERVER METRICS TESTS
# ==========================================
class TestServerMetricsGraphs:
"""Test server metrics tracking and graphing."""
def test_initialization(self):
"""
Test metrics graph initialization with empty state.
"""
graphs = ServerMetricsGraphs('abc123', 'Test Server')
assert graphs.server_id == 'abc123'
assert graphs.server_name == 'Test Server'
assert len(graphs.data_points) == 0
assert graphs.has_sufficient_data is False
def test_add_data_point(self):
"""
Test adding data points and checking sufficient data threshold.
"""
graphs = ServerMetricsGraphs('abc123', 'Test Server')
graphs.add_data_point(50.0, 1024.0)
assert len(graphs.data_points) == 1
assert graphs.has_sufficient_data is False
graphs.add_data_point(55.0, 1100.0)
assert len(graphs.data_points) == 2
assert graphs.has_sufficient_data is True
def test_data_rotation(self):
"""
Test automatic data point rotation (FIFO with maxlen=6).
"""
graphs = ServerMetricsGraphs('abc123', 'Test Server')
# Add 8 data points to test rotation
for i in range(8):
graphs.add_data_point(float(i * 10), float(i * 100))
# Should only keep the last 6
assert len(graphs.data_points) == 6
assert graphs.data_points[0][1] == 20.0 # CPU of 3rd point
assert graphs.data_points[-1][1] == 70.0 # CPU of 8th point
def test_cpu_scale_calculation(self):
"""
Test dynamic CPU scale limit calculation for multi-vCPU servers.
"""
graphs = ServerMetricsGraphs('abc123', 'Test Server')
# Test single vCPU (<=100%)
assert graphs._calculate_cpu_scale_limit(75.0) == 100
assert graphs._calculate_cpu_scale_limit(100.0) == 100
# Test multi-vCPU scenarios
assert graphs._calculate_cpu_scale_limit(150.0) == 200
assert graphs._calculate_cpu_scale_limit(250.0) == 300
assert graphs._calculate_cpu_scale_limit(350.0) == 400
def test_get_data_summary(self):
"""
Test data summary generation including trends.
"""
graphs = ServerMetricsGraphs('abc123', 'Test Server')
# No data case
summary = graphs.get_data_summary()
assert summary['point_count'] == 0
assert summary['has_data'] is False
# Add data points with increasing trend
graphs.add_data_point(50.0, 1000.0)
graphs.add_data_point(60.0, 1100.0)
summary = graphs.get_data_summary()
assert summary['point_count'] == 2
assert summary['has_data'] is True
assert summary['latest_cpu'] == 60.0
assert summary['latest_memory'] == 1100.0
assert summary['cpu_trend'] == 'increasing'
def test_generate_graph_insufficient_data(self):
"""
Test graph generation returns None with insufficient data.
"""
graphs = ServerMetricsGraphs('abc123', 'Test Server')
# Only one data point - should return None
graphs.add_data_point(50.0, 1000.0)
assert graphs.generate_cpu_graph() is None
assert graphs.generate_memory_graph() is None
assert graphs.generate_combined_graph() is None
class TestServerMetricsManager:
"""Test server metrics manager."""
def test_initialization(self):
"""
Test manager initialization with empty state.
"""
manager = ServerMetricsManager()
assert len(manager.server_graphs) == 0
def test_get_or_create_server_graphs(self):
"""
Test getting or creating server graphs returns same instance.
"""
manager = ServerMetricsManager()
graphs1 = manager.get_or_create_server_graphs('abc123', 'Test Server')
graphs2 = manager.get_or_create_server_graphs('abc123', 'Test Server')
assert graphs1 is graphs2 # Should return same instance
assert len(manager.server_graphs) == 1
def test_add_server_data(self):
"""
Test adding data through manager properly creates graphs.
"""
manager = ServerMetricsManager()
manager.add_server_data('abc123', 'Test Server', 50.0, 1024.0)
graphs = manager.get_server_graphs('abc123')
assert graphs is not None
assert len(graphs.data_points) == 1
def test_remove_server(self):
"""
Test removing server from tracking.
"""
manager = ServerMetricsManager()
manager.add_server_data('abc123', 'Test Server', 50.0, 1024.0)
assert 'abc123' in manager.server_graphs
manager.remove_server('abc123')
assert 'abc123' not in manager.server_graphs
def test_cleanup_old_servers(self):
"""
Test cleanup of inactive servers not in active list.
"""
manager = ServerMetricsManager()
# Add data for 3 servers
manager.add_server_data('server1', 'Server 1', 50.0, 1024.0)
manager.add_server_data('server2', 'Server 2', 60.0, 2048.0)
manager.add_server_data('server3', 'Server 3', 70.0, 3072.0)
# Only server1 and server2 are still active
manager.cleanup_old_servers(['server1', 'server2'])
assert 'server1' in manager.server_graphs
assert 'server2' in manager.server_graphs
assert 'server3' not in manager.server_graphs
def test_get_summary(self):
"""
Test getting manager summary with statistics.
"""
manager = ServerMetricsManager()
# Add some servers with varying data
manager.add_server_data('server1', 'Server 1', 50.0, 1024.0)
manager.add_server_data('server1', 'Server 1', 55.0, 1100.0)
manager.add_server_data('server2', 'Server 2', 60.0, 2048.0)
summary = manager.get_summary()
assert summary['total_servers'] == 2
assert summary['servers_with_data'] == 1 # Only server1 has >=2 points
assert summary['total_data_points'] == 3
# ==========================================
# DISCORD BOT TESTS
# ==========================================
class TestServerStatusView:
"""Test Discord UI view for server status."""
@pytest.mark.asyncio
async def test_view_initialization(self, mock_pterodactyl_api, sample_server_data):
"""
Test view initialization with server data.
Args:
mock_pterodactyl_api: Pytest fixture providing mocked API instance
sample_server_data: Pytest fixture providing sample server data
"""
view = ServerStatusView(
'abc123',
'Test Server',
mock_pterodactyl_api,
sample_server_data
)
assert view.server_id == 'abc123'
assert view.server_name == 'Test Server'
assert view.api is mock_pterodactyl_api
@pytest.mark.asyncio
async def test_interaction_check_authorized(self, mock_pterodactyl_api,
sample_server_data, mock_discord_interaction):
"""
Test interaction check with authorized user having required role.
Args:
mock_pterodactyl_api: Pytest fixture providing mocked API instance
sample_server_data: Pytest fixture providing sample server data
mock_discord_interaction: Pytest fixture providing mocked Discord interaction
"""
view = ServerStatusView('abc123', 'Test Server',
mock_pterodactyl_api, sample_server_data)
result = await view.interaction_check(mock_discord_interaction)
assert result is True
@pytest.mark.asyncio
async def test_interaction_check_wrong_guild(self, mock_pterodactyl_api,
sample_server_data, mock_discord_interaction):
"""
Test interaction check rejects wrong guild.
Args:
mock_pterodactyl_api: Pytest fixture providing mocked API instance
sample_server_data: Pytest fixture providing sample server data
mock_discord_interaction: Pytest fixture providing mocked Discord interaction
"""
view = ServerStatusView('abc123', 'Test Server',
mock_pterodactyl_api, sample_server_data)
mock_discord_interaction.guild_id = 999999999 # Wrong guild
result = await view.interaction_check(mock_discord_interaction)
assert result is False
mock_discord_interaction.response.send_message.assert_called_once()
class TestPterodactylBot:
"""Test main bot class."""
@pytest.mark.asyncio
async def test_bot_initialization(self):
"""
Test bot initialization with default values.
"""
intents = discord.Intents.default()
bot = PterodactylBot(command_prefix="!", intents=intents)
assert bot.server_cache == {}
assert bot.embed_locations == {}
assert bot.metrics_manager is not None
@pytest.mark.asyncio
async def test_track_new_embed(self):
"""
Test tracking new embed location in storage.
"""
intents = discord.Intents.default()
bot = PterodactylBot(command_prefix="!", intents=intents)
mock_message = Mock()
mock_message.channel = Mock()
mock_message.channel.id = 123456
mock_message.id = 789012
with patch.object(bot, 'save_embed_locations', new=AsyncMock()):
await bot.track_new_embed('abc123', mock_message)
assert 'abc123' in bot.embed_locations
assert bot.embed_locations['abc123']['channel_id'] == '123456'
assert bot.embed_locations['abc123']['message_id'] == '789012'
@pytest.mark.asyncio
async def test_load_embed_locations(self, tmp_path):
"""
Test loading embed locations from JSON file.
Args:
tmp_path: Pytest fixture providing temporary directory
"""
intents = discord.Intents.default()
bot = PterodactylBot(command_prefix="!", intents=intents)
# Create temporary embed locations file
embed_file = tmp_path / "embed_locations.json"
test_data = {
'abc123': {
'channel_id': '123456',
'message_id': '789012'
}
}
embed_file.write_text(json.dumps(test_data))
bot.embed_storage_path = embed_file
await bot.load_embed_locations()
assert 'abc123' in bot.embed_locations
assert bot.embed_locations['abc123']['channel_id'] == '123456'
@pytest.mark.asyncio
async def test_save_embed_locations(self, tmp_path):
"""
Test saving embed locations to JSON file.
Args:
tmp_path: Pytest fixture providing temporary directory
"""
intents = discord.Intents.default()
bot = PterodactylBot(command_prefix="!", intents=intents)
embed_file = tmp_path / "embed_locations.json"
bot.embed_storage_path = embed_file
bot.embed_locations = {
'abc123': {
'channel_id': '123456',
'message_id': '789012'
}
}
await bot.save_embed_locations()
assert embed_file.exists()
loaded_data = json.loads(embed_file.read_text())
assert loaded_data == bot.embed_locations
# ==========================================
# INTEGRATION TESTS
# ==========================================
class TestIntegration:
"""Integration tests for complete workflows."""
@pytest.mark.asyncio
async def test_server_status_command_flow(self, mock_discord_interaction,
sample_server_data, sample_resources_data):
"""
Test complete server status command flow.
Args:
mock_discord_interaction: Pytest fixture providing mocked Discord interaction
sample_server_data: Pytest fixture providing sample server data
sample_resources_data: Pytest fixture providing sample resource data
"""
# This would require extensive mocking of Discord.py internals
# Simplified test to verify command registration
intents = discord.Intents.default()
bot = PterodactylBot(command_prefix="!", intents=intents)
# Verify command exists in tree
assert bot.tree is not None
@pytest.mark.asyncio
async def test_metrics_collection_and_graphing(self):
"""
Test complete metrics collection and graph generation flow.
"""
manager = ServerMetricsManager()
# Simulate data collection over time
for i in range(6):
cpu = 50.0 + (i * 5)
memory = 1000.0 + (i * 100)
manager.add_server_data('test_server', 'Test Server', cpu, memory)
graphs = manager.get_server_graphs('test_server')
assert graphs is not None
assert graphs.has_sufficient_data
# Generate graphs
cpu_graph = graphs.generate_cpu_graph()
memory_graph = graphs.generate_memory_graph()
combined_graph = graphs.generate_combined_graph()
# Verify graphs were generated
assert cpu_graph is not None
assert memory_graph is not None
assert combined_graph is not None
# ==========================================
# RUN TESTS
# ==========================================
if __name__ == '__main__':
pytest.main([__file__, '-v', '--tb=short'])