Skip to content

Commit a422e3c

Browse files
author
ivanmkc@google.com
committed
feat(plugins): add thread-safe LogCollectorPlugin for event logging
This commit introduces the `LogCollectorPlugin`, a plugin that collects execution logs from ADK callbacks in asynchronous environments. Logs are organized by session ID for retrieval. The plugin uses `asyncio.Lock` to ensure thread-safe logging. Logs are stored in a `defaultdict(list)` indexed by session ID and can be retrieved with the `get_logs_by_session` method. The plugin implements the following callback hooks to log contextual data: - `on_user_message` - `before_run`, `after_run` - `before_agent`, `after_agent` - `before_model`, `after_model`, `on_model_error` - `before_tool`, `after_tool`, `on_tool_error` - `on_event` Each log entry includes the callback type and relevant data, such as invocation ID, agent name, tool name, function call ID, arguments, and results or errors.
1 parent 5900273 commit a422e3c

File tree

2 files changed

+572
-0
lines changed

2 files changed

+572
-0
lines changed
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
import asyncio
18+
from collections import defaultdict
19+
from typing import Any, Optional, Dict, List, TYPE_CHECKING
20+
21+
from google.adk.agents.base_agent import BaseAgent
22+
from google.adk.agents.callback_context import CallbackContext
23+
from google.adk.events.event import Event
24+
from google.adk.models.llm_request import LlmRequest
25+
from google.adk.models.llm_response import LlmResponse
26+
from google.adk.plugins.base_plugin import BasePlugin
27+
from google.adk.tools.base_tool import BaseTool
28+
from google.adk.tools.tool_context import ToolContext
29+
from google.genai import types
30+
31+
if TYPE_CHECKING:
32+
from google.adk.agents.invocation_context import InvocationContext
33+
34+
35+
class LogCollectorPlugin(BasePlugin):
36+
"""
37+
A plugin to programmatically and safely collect execution details from all
38+
callbacks in async environments, organized by session ID.
39+
"""
40+
41+
def __init__(self, name: str = "log_collector"):
42+
super().__init__(name)
43+
self.logs: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
44+
self._lock = asyncio.Lock()
45+
46+
async def _log_entry(self, session_id: str, callback_type: str, data: Dict[str, Any]):
47+
log_entry = {"callback_type": callback_type, **data}
48+
async with self._lock:
49+
self.logs[session_id].append(log_entry)
50+
51+
async def on_user_message_callback(
52+
self, *, invocation_context: "InvocationContext", user_message: types.Content
53+
) -> Optional[types.Content]:
54+
session_id = invocation_context.session.id
55+
await self._log_entry(
56+
session_id,
57+
"on_user_message",
58+
{
59+
"invocation_id": invocation_context.invocation_id,
60+
"user_message": user_message.parts[0].text,
61+
},
62+
)
63+
return None
64+
65+
async def before_run_callback(
66+
self, *, invocation_context: "InvocationContext"
67+
) -> Optional[types.Content]:
68+
session_id = invocation_context.session.id
69+
await self._log_entry(
70+
session_id,
71+
"before_run",
72+
{
73+
"invocation_id": invocation_context.invocation_id,
74+
"agent_name": invocation_context.agent.name,
75+
},
76+
)
77+
return None
78+
79+
async def after_run_callback(
80+
self, *, invocation_context: "InvocationContext"
81+
) -> None:
82+
session_id = invocation_context.session.id
83+
await self._log_entry(
84+
session_id,
85+
"after_run",
86+
{
87+
"invocation_id": invocation_context.invocation_id,
88+
"agent_name": invocation_context.agent.name,
89+
},
90+
)
91+
return None
92+
93+
async def before_agent_callback(
94+
self, *, agent: BaseAgent, callback_context: CallbackContext
95+
) -> Optional[types.Content]:
96+
session_id = callback_context._invocation_context.session.id
97+
await self._log_entry(
98+
session_id,
99+
"before_agent",
100+
{
101+
"agent_name": agent.name,
102+
"invocation_id": callback_context.invocation_id,
103+
},
104+
)
105+
return None
106+
107+
async def after_agent_callback(
108+
self, *, agent: BaseAgent, callback_context: CallbackContext
109+
) -> Optional[types.Content]:
110+
session_id = callback_context._invocation_context.session.id
111+
await self._log_entry(
112+
session_id,
113+
"after_agent",
114+
{
115+
"agent_name": agent.name,
116+
"invocation_id": callback_context.invocation_id,
117+
},
118+
)
119+
return None
120+
121+
async def before_model_callback(
122+
self, *, callback_context: CallbackContext, llm_request: LlmRequest
123+
) -> Optional[LlmResponse]:
124+
session_id = callback_context._invocation_context.session.id
125+
await self._log_entry(
126+
session_id,
127+
"before_model",
128+
{
129+
"agent_name": callback_context.agent_name,
130+
"request": llm_request.model_dump(),
131+
},
132+
)
133+
return None
134+
135+
async def after_model_callback(
136+
self, *, callback_context: CallbackContext, llm_response: LlmResponse
137+
) -> Optional[LlmResponse]:
138+
session_id = callback_context._invocation_context.session.id
139+
await self._log_entry(
140+
session_id,
141+
"after_model",
142+
{
143+
"agent_name": callback_context.agent_name,
144+
"response": llm_response.model_dump(),
145+
},
146+
)
147+
return None
148+
149+
async def on_model_error_callback(
150+
self,
151+
*,
152+
callback_context: CallbackContext,
153+
llm_request: LlmRequest,
154+
error: Exception,
155+
) -> Optional[LlmResponse]:
156+
session_id = callback_context._invocation_context.session.id
157+
await self._log_entry(
158+
session_id,
159+
"on_model_error",
160+
{
161+
"agent_name": callback_context.agent_name,
162+
"request": llm_request.model_dump(),
163+
"error": str(error),
164+
},
165+
)
166+
return None
167+
168+
async def on_event_callback(
169+
self, *, invocation_context: "InvocationContext", event: Event
170+
) -> Optional[Event]:
171+
session_id = invocation_context.session.id
172+
await self._log_entry(
173+
session_id,
174+
"on_event",
175+
{
176+
"event_id": event.id,
177+
"author": event.author,
178+
"content": event.content.parts[0].text,
179+
"is_final": event.is_final_response(),
180+
},
181+
)
182+
return None
183+
184+
async def before_tool_callback(
185+
self,
186+
*,
187+
tool: BaseTool,
188+
tool_args: Dict[str, Any],
189+
tool_context: ToolContext,
190+
) -> Optional[Dict]:
191+
session_id = tool_context._invocation_context.session.id
192+
await self._log_entry(
193+
session_id,
194+
"before_tool",
195+
{
196+
"tool_name": tool.name,
197+
"agent_name": tool_context.agent_name,
198+
"function_call_id": tool_context.function_call_id,
199+
"args": tool_args,
200+
},
201+
)
202+
return None
203+
204+
async def after_tool_callback(
205+
self,
206+
*,
207+
tool: BaseTool,
208+
tool_args: Dict[str, Any],
209+
tool_context: ToolContext,
210+
result: Dict,
211+
) -> Optional[Dict]:
212+
session_id = tool_context._invocation_context.session.id
213+
await self._log_entry(
214+
session_id,
215+
"after_tool",
216+
{
217+
"tool_name": tool.name,
218+
"agent_name": tool_context.agent_name,
219+
"function_call_id": tool_context.function_call_id,
220+
"args": tool_args,
221+
"result": result,
222+
},
223+
)
224+
return None
225+
226+
async def on_tool_error_callback(
227+
self,
228+
*,
229+
tool: BaseTool,
230+
tool_args: Dict[str, Any],
231+
tool_context: ToolContext,
232+
error: Exception,
233+
) -> Optional[Dict]:
234+
session_id = tool_context._invocation_context.session.id
235+
await self._log_entry(
236+
session_id,
237+
"on_tool_error",
238+
{
239+
"tool_name": tool.name,
240+
"agent_name": tool_context.agent_name,
241+
"function_call_id": tool_context.function_call_id,
242+
"args": tool_args,
243+
"error": str(error),
244+
},
245+
)
246+
return None
247+
248+
def get_logs_by_session(self, session_id: str) -> List[Dict[str, Any]]:
249+
"""Retrieve all logs for a specific session."""
250+
return self.logs.get(session_id, [])

0 commit comments

Comments
 (0)