-
-
Notifications
You must be signed in to change notification settings - Fork 727
Description
monitoring
💡 Proposed PraisonAI Agents Implementation
1. Granular Token Tracking
from dataclasses import dataclass
from praisonaiagents import Agent, ProcessNew metrics classes
@DataClass
class TokenMetrics:
"""Comprehensive token tracking"""
input_tokens: int = 0
output_tokens: int = 0
total_tokens: int = 0# Special tokens audio_tokens: int = 0 input_audio_tokens: int = 0 output_audio_tokens: int = 0 cached_tokens: int = 0 cache_write_tokens: int = 0 reasoning_tokens: int = 0 def __add__(self, other): """Enable metric aggregation""" return TokenMetrics( input_tokens=self.input_tokens + other.input_tokens, output_tokens=self.output_tokens + other.output_tokens, # ... aggregate all fields )
Client usage
agent = Agent(
name="Assistant",
model="gpt-4o",
track_metrics=True # Enable detailed metrics
)Automatic token tracking
response = agent.chat("Analyze this audio file", audio_file="speech.mp3")
Access detailed metrics
print(f"Text tokens: {response.metrics.input_tokens}")
print(f"Audio tokens: {response.metrics.audio_tokens}")
print(f"Cached tokens: {response.metrics.cached_tokens}")Session-level aggregation
process = Process(agents=[agent])
result = process.run()
print(f"Total session tokens: {result.metrics.total_tokens}")2. Time-to-First-Token (TTFT) Tracking
from praisonaiagents import Agent
from praisonaiagents.metrics import PerformanceMetricsTTFT automatically tracked for streaming
agent = Agent(
name="StreamingBot",
model="claude-3",
stream=True,
track_performance=True
)Stream with TTFT tracking
for chunk in agent.stream("Generate a poem"):
print(chunk, end="")Access performance metrics
metrics = agent.last_metrics
print(f"\nTime to first token: {metrics.time_to_first_token:.3f}s")
print(f"Total generation time: {metrics.total_time:.3f}s")
print(f"Tokens per second: {metrics.tokens_per_second:.1f}")3. Session-Level Metric Aggregation
from praisonaiagents import Agent, Task, Process
from praisonaiagents.metrics import MetricsCollectorAutomatic session aggregation
collector = MetricsCollector()
agent1 = Agent(name="Researcher", metrics_collector=collector)
agent2 = Agent(name="Writer", metrics_collector=collector)process = Process(
agents=[agent1, agent2],
metrics_collector=collector
)result = process.run()
Get aggregated metrics
session_metrics = collector.get_session_metrics()
print(f"Total tokens used: {session_metrics.total_tokens}")
print(f"By agent:")
for agent_name, metrics in session_metrics.by_agent.items():
print(f" {agent_name}: {metrics.total_tokens} tokens")Export metrics
collector.export_metrics("metrics.json")
4. Enhanced Telemetry Integration
from praisonaiagents import Agent
from praisonaiagents.telemetry import TelemetryConfig, MetricsExporterConfigure telemetry
config = TelemetryConfig(
# Privacy-first defaults
track_tokens=True,
track_performance=True,
track_errors=True,# Optional integrations export_to_opentelemetry=True, export_to_prometheus=False, # Control via environment respect_env_vars=True # PRAISONAI_TELEMETRY_DISABLED
)
Option 1: Global configuration
Agent.configure_telemetry(config)
Option 2: Per-agent configuration
agent = Agent(
name="Assistant",
telemetry_config=config
)Option 3: Custom exporters
class CustomExporter(MetricsExporter):
def export(self, metrics):
# Send to your monitoring system
send_to_datadog(metrics)agent = Agent(
name="Assistant",
metrics_exporters=[CustomExporter()]
)Real-time metrics access
with agent.track_metrics() as tracker:
response = agent.chat("Complex task")# Mid-execution metrics print(f"Current tokens: {tracker.current_tokens}") print(f"Elapsed time: {tracker.elapsed_time}")
📋 Implementation Recommendations
1. Phased Rollout
Phase 1: Core Token Metrics
Add to praisonaiagents/metrics.py
class TokenMetrics:
# Basic token trackingclass PerformanceMetrics:
# Time tracking including TTFT
Phase 2: Provider IntegrationUpdate each LLM provider to extract metrics
def _extract_openai_metrics(response):
return TokenMetrics(
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
cached_tokens=response.usage.get('cached_tokens', 0)
)
Phase 3: Aggregation & ExportAdd session-level aggregation
class MetricsCollector:
def aggregate_by_agent(self)
def aggregate_by_task(self)
def export_to_json(self)
def export_to_opentelemetry(self)2. Integration with Existing Telemetry
Extend existing MinimalTelemetry
class EnhancedTelemetry(MinimalTelemetry):
def track_tokens(self, metrics: TokenMetrics):
self._post_event("tokens_used", {
"total": metrics.total_tokens,
"cached": metrics.cached_tokens,
"reasoning": metrics.reasoning_tokens
})def track_performance(self, metrics: PerformanceMetrics): self._post_event("performance", { "ttft": metrics.time_to_first_token, "total_time": metrics.total_time, "tokens_per_second": metrics.tokens_per_second })
3. Backward Compatibility
Maintain simple API while adding advanced features
agent = Agent(name="Bot") # Works as before
Opt-in to enhanced monitoring
agent = Agent(
name="Bot",
track_metrics=True # New feature
)