Skip to content

monitoring #970

@MervinPraison

Description

@MervinPraison

@claude

monitoring

💡 Proposed PraisonAI Agents Implementation

1. Granular Token Tracking

from dataclasses import dataclass
from praisonaiagents import Agent, Process

New metrics classes

@DataClass
class TokenMetrics:
"""Comprehensive token tracking"""
input_tokens: int = 0
output_tokens: int = 0
total_tokens: int = 0

# Special tokens
audio_tokens: int = 0
input_audio_tokens: int = 0
output_audio_tokens: int = 0
cached_tokens: int = 0
cache_write_tokens: int = 0
reasoning_tokens: int = 0

def __add__(self, other):
    """Enable metric aggregation"""
    return TokenMetrics(
        input_tokens=self.input_tokens + other.input_tokens,
        output_tokens=self.output_tokens + other.output_tokens,
        # ... aggregate all fields
    )

Client usage

agent = Agent(
name="Assistant",
model="gpt-4o",
track_metrics=True # Enable detailed metrics
)

Automatic token tracking

response = agent.chat("Analyze this audio file", audio_file="speech.mp3")

Access detailed metrics

print(f"Text tokens: {response.metrics.input_tokens}")
print(f"Audio tokens: {response.metrics.audio_tokens}")
print(f"Cached tokens: {response.metrics.cached_tokens}")

Session-level aggregation

process = Process(agents=[agent])
result = process.run()
print(f"Total session tokens: {result.metrics.total_tokens}")

2. Time-to-First-Token (TTFT) Tracking

from praisonaiagents import Agent
from praisonaiagents.metrics import PerformanceMetrics

TTFT automatically tracked for streaming

agent = Agent(
name="StreamingBot",
model="claude-3",
stream=True,
track_performance=True
)

Stream with TTFT tracking

for chunk in agent.stream("Generate a poem"):
print(chunk, end="")

Access performance metrics

metrics = agent.last_metrics
print(f"\nTime to first token: {metrics.time_to_first_token:.3f}s")
print(f"Total generation time: {metrics.total_time:.3f}s")
print(f"Tokens per second: {metrics.tokens_per_second:.1f}")

3. Session-Level Metric Aggregation

from praisonaiagents import Agent, Task, Process
from praisonaiagents.metrics import MetricsCollector

Automatic session aggregation

collector = MetricsCollector()

agent1 = Agent(name="Researcher", metrics_collector=collector)
agent2 = Agent(name="Writer", metrics_collector=collector)

process = Process(
agents=[agent1, agent2],
metrics_collector=collector
)

result = process.run()

Get aggregated metrics

session_metrics = collector.get_session_metrics()
print(f"Total tokens used: {session_metrics.total_tokens}")
print(f"By agent:")
for agent_name, metrics in session_metrics.by_agent.items():
print(f" {agent_name}: {metrics.total_tokens} tokens")

Export metrics

collector.export_metrics("metrics.json")

4. Enhanced Telemetry Integration

from praisonaiagents import Agent
from praisonaiagents.telemetry import TelemetryConfig, MetricsExporter

Configure telemetry

config = TelemetryConfig(
# Privacy-first defaults
track_tokens=True,
track_performance=True,
track_errors=True,

# Optional integrations
export_to_opentelemetry=True,
export_to_prometheus=False,

# Control via environment
respect_env_vars=True  # PRAISONAI_TELEMETRY_DISABLED

)

Option 1: Global configuration

Agent.configure_telemetry(config)

Option 2: Per-agent configuration

agent = Agent(
name="Assistant",
telemetry_config=config
)

Option 3: Custom exporters

class CustomExporter(MetricsExporter):
def export(self, metrics):
# Send to your monitoring system
send_to_datadog(metrics)

agent = Agent(
name="Assistant",
metrics_exporters=[CustomExporter()]
)

Real-time metrics access

with agent.track_metrics() as tracker:
response = agent.chat("Complex task")

# Mid-execution metrics
print(f"Current tokens: {tracker.current_tokens}")
print(f"Elapsed time: {tracker.elapsed_time}")

📋 Implementation Recommendations

1. Phased Rollout

Phase 1: Core Token Metrics

Add to praisonaiagents/metrics.py

class TokenMetrics:
# Basic token tracking

class PerformanceMetrics:
# Time tracking including TTFT
Phase 2: Provider Integration

Update each LLM provider to extract metrics

def _extract_openai_metrics(response):
return TokenMetrics(
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
cached_tokens=response.usage.get('cached_tokens', 0)
)
Phase 3: Aggregation & Export

Add session-level aggregation

class MetricsCollector:
def aggregate_by_agent(self)
def aggregate_by_task(self)
def export_to_json(self)
def export_to_opentelemetry(self)

2. Integration with Existing Telemetry

Extend existing MinimalTelemetry

class EnhancedTelemetry(MinimalTelemetry):
def track_tokens(self, metrics: TokenMetrics):
self._post_event("tokens_used", {
"total": metrics.total_tokens,
"cached": metrics.cached_tokens,
"reasoning": metrics.reasoning_tokens
})

def track_performance(self, metrics: PerformanceMetrics):
    self._post_event("performance", {
        "ttft": metrics.time_to_first_token,
        "total_time": metrics.total_time,
        "tokens_per_second": metrics.tokens_per_second
    })

3. Backward Compatibility

Maintain simple API while adding advanced features

agent = Agent(name="Bot") # Works as before

Opt-in to enhanced monitoring

agent = Agent(
name="Bot",
track_metrics=True # New feature
)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions