Skip to content

Commit 2bf924b

Browse files
authored
Add RPM control to both agents and crews (#133)
* moving file into utilities * creating Logger and RPMController * Adding support for RPM to agents and crew
1 parent 3686804 commit 2bf924b

16 files changed

+2343
-85
lines changed

src/crewai/agent.py

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@
2424
CrewAgentOutputParser,
2525
ToolsHandler,
2626
)
27-
from crewai.i18n import I18N
28-
from crewai.prompts import Prompts
27+
from crewai.utilities import I18N, Logger, Prompts, RPMController
2928

3029

3130
class Agent(BaseModel):
@@ -42,11 +41,14 @@ class Agent(BaseModel):
4241
llm: The language model that will run the agent.
4342
max_iter: Maximum number of iterations for an agent to execute a task.
4443
memory: Whether the agent should have memory or not.
44+
max_rpm: Maximum number of requests per minute for the agent execution to be respected.
4545
verbose: Whether the agent execution should be in verbose mode.
4646
allow_delegation: Whether the agent is allowed to delegate tasks to other agents.
4747
"""
4848

4949
__hash__ = object.__hash__
50+
_logger: Logger = PrivateAttr()
51+
_rpm_controller: RPMController = PrivateAttr(default=None)
5052
_request_within_rpm_limit: Any = PrivateAttr(default=None)
5153

5254
model_config = ConfigDict(arbitrary_types_allowed=True)
@@ -58,6 +60,10 @@ class Agent(BaseModel):
5860
role: str = Field(description="Role of the agent")
5961
goal: str = Field(description="Objective of the agent")
6062
backstory: str = Field(description="Backstory of the agent")
63+
max_rpm: Optional[int] = Field(
64+
default=None,
65+
description="Maximum number of requests per minute for the agent execution to be respected.",
66+
)
6167
memory: bool = Field(
6268
default=True, description="Whether the agent should have memory or not"
6369
)
@@ -101,6 +107,15 @@ def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
101107
"may_not_set_field", "This field is not to be set by the user.", {}
102108
)
103109

110+
@model_validator(mode="after")
111+
def set_private_attrs(self):
112+
self._logger = Logger(self.verbose)
113+
if self.max_rpm and not self._rpm_controller:
114+
self._rpm_controller = RPMController(
115+
max_rpm=self.max_rpm, logger=self._logger
116+
)
117+
return self
118+
104119
@model_validator(mode="after")
105120
def check_agent_executor(self) -> "Agent":
106121
if not self.agent_executor:
@@ -128,7 +143,7 @@ def execute_task(
128143
tools = tools or self.tools
129144
self.agent_executor.tools = tools
130145

131-
return self.agent_executor.invoke(
146+
result = self.agent_executor.invoke(
132147
{
133148
"input": task,
134149
"tool_names": self.__tools_names(tools),
@@ -137,14 +152,20 @@ def execute_task(
137152
RunnableConfig(callbacks=[self.tools_handler]),
138153
)["output"]
139154

155+
if self.max_rpm:
156+
self._rpm_controller.stop_rpm_counter()
157+
158+
return result
159+
140160
def set_cache_handler(self, cache_handler) -> None:
141161
self.cache_handler = cache_handler
142162
self.tools_handler = ToolsHandler(cache=self.cache_handler)
143163
self.__create_agent_executor()
144164

145-
def set_request_within_rpm_limit(self, ensure_function) -> None:
146-
self._request_within_rpm_limit = ensure_function
147-
self.__create_agent_executor()
165+
def set_rpm_controller(self, rpm_controller) -> None:
166+
if not self._rpm_controller:
167+
self._rpm_controller = rpm_controller
168+
self.__create_agent_executor()
148169

149170
def __create_agent_executor(self) -> CrewAgentExecutor:
150171
"""Create an agent executor for the agent.
@@ -164,9 +185,13 @@ def __create_agent_executor(self) -> CrewAgentExecutor:
164185
"verbose": self.verbose,
165186
"handle_parsing_errors": True,
166187
"max_iterations": self.max_iter,
167-
"request_within_rpm_limit": self._request_within_rpm_limit,
168188
}
169189

190+
if self._rpm_controller:
191+
executor_args[
192+
"request_within_rpm_limit"
193+
] = self._rpm_controller.check_or_wait
194+
170195
if self.memory:
171196
summary_memory = ConversationSummaryMemory(
172197
llm=self.llm, input_key="input", memory_key="chat_history"

src/crewai/agents/exceptions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from langchain_core.exceptions import OutputParserException
22

3-
from crewai.i18n import I18N
3+
from crewai.utilities import I18N
44

55

66
class TaskRepeatedUsageException(OutputParserException):

src/crewai/agents/executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
from langchain_core.utils.input import get_color_mapping
1313

1414
from crewai.agents.cache.cache_hit import CacheHit
15-
from crewai.i18n import I18N
1615
from crewai.tools.cache_tools import CacheTools
16+
from crewai.utilities import I18N
1717

1818

1919
class CrewAgentExecutor(AgentExecutor):

src/crewai/agents/output_parser.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from crewai.agents.cache import CacheHandler, CacheHit
88
from crewai.agents.exceptions import TaskRepeatedUsageException
99
from crewai.agents.tools_handler import ToolsHandler
10-
from crewai.i18n import I18N
10+
from crewai.utilities import I18N
1111

1212
FINAL_ANSWER_ACTION = "Final Answer:"
1313
FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE = (

src/crewai/crew.py

Lines changed: 25 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
import json
2-
import threading
3-
import time
42
import uuid
5-
from typing import Any, ClassVar, Dict, List, Optional, Union
3+
from typing import Any, Dict, List, Optional, Union
64

75
from pydantic import (
86
UUID4,
@@ -19,10 +17,10 @@
1917

2018
from crewai.agent import Agent
2119
from crewai.agents.cache import CacheHandler
22-
from crewai.i18n import I18N
2320
from crewai.process import Process
2421
from crewai.task import Task
2522
from crewai.tools.agent_tools import AgentTools
23+
from crewai.utilities import I18N, Logger, RPMController
2624

2725

2826
class Crew(BaseModel):
@@ -37,23 +35,26 @@ class Crew(BaseModel):
3735
config: Configuration settings for the crew.
3836
cache_handler: Handles caching for the crew's operations.
3937
max_rpm: Maximum number of requests per minute for the crew execution to be respected.
40-
rpm: Current number of requests per minute for the crew execution.
4138
id: A unique identifier for the crew instance.
4239
"""
4340

4441
__hash__ = object.__hash__
45-
_timer: Optional[threading.Timer] = PrivateAttr(default=None)
46-
lock: ClassVar[threading.Lock] = threading.Lock()
47-
rpm: ClassVar[int] = 0
48-
max_rpm: Optional[int] = Field(default=None)
42+
_rpm_controller: RPMController = PrivateAttr()
43+
_logger: Logger = PrivateAttr()
44+
_cache_handler: Optional[InstanceOf[CacheHandler]] = PrivateAttr(
45+
default=CacheHandler()
46+
)
4947
model_config = ConfigDict(arbitrary_types_allowed=True)
5048
tasks: List[Task] = Field(default_factory=list)
5149
agents: List[Agent] = Field(default_factory=list)
5250
process: Process = Field(default=Process.sequential)
5351
verbose: Union[int, bool] = Field(default=0)
5452
config: Optional[Union[Json, Dict[str, Any]]] = Field(default=None)
55-
cache_handler: Optional[InstanceOf[CacheHandler]] = Field(default=CacheHandler())
5653
id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True)
54+
max_rpm: Optional[int] = Field(
55+
default=None,
56+
description="Maximum number of requests per minute for the crew execution to be respected.",
57+
)
5758
language: str = Field(
5859
default="en",
5960
description="Language used for the crew, defaults to English.",
@@ -74,9 +75,10 @@ def check_config_type(cls, v: Union[Json, Dict[str, Any]]):
7475
return json.loads(v) if isinstance(v, Json) else v
7576

7677
@model_validator(mode="after")
77-
def set_reset_counter(self):
78-
if self.max_rpm:
79-
self._reset_request_count()
78+
def set_private_attrs(self):
79+
self._cache_handler = CacheHandler()
80+
self._logger = Logger(self.verbose)
81+
self._rpm_controller = RPMController(max_rpm=self.max_rpm, logger=self._logger)
8082
return self
8183

8284
@model_validator(mode="after")
@@ -94,8 +96,8 @@ def check_config(self):
9496

9597
if self.agents:
9698
for agent in self.agents:
97-
agent.set_cache_handler(self.cache_handler)
98-
agent.set_request_within_rpm_limit(self.ensure_request_within_rpm_limit)
99+
agent.set_cache_handler(self._cache_handler)
100+
agent.set_rpm_controller(self._rpm_controller)
99101
return self
100102

101103
def _setup_from_config(self):
@@ -116,28 +118,9 @@ def _create_task(self, task_config):
116118
del task_config["agent"]
117119
return Task(**task_config, agent=task_agent)
118120

119-
def ensure_request_within_rpm_limit(self):
120-
if not self.max_rpm:
121-
return True
122-
123-
with Crew.lock:
124-
if Crew.rpm < self.max_rpm:
125-
Crew.rpm += 1
126-
return True
127-
self._log("info", "Max RPM reached, waiting for next minute to start.")
128-
129-
return self._wait_for_next_minute()
130-
131-
def _wait_for_next_minute(self):
132-
time.sleep(60)
133-
with Crew.lock:
134-
Crew.rpm = 0
135-
return True
136-
137121
def kickoff(self) -> str:
138122
"""Starts the crew to work on its assigned tasks."""
139123
for agent in self.agents:
140-
agent.cache_handler = self.cache_handler
141124
agent.i18n = I18N(language=self.language)
142125

143126
if self.process == Process.sequential:
@@ -149,33 +132,18 @@ def _sequential_loop(self) -> str:
149132
for task in self.tasks:
150133
self._prepare_and_execute_task(task)
151134
task_output = task.execute(task_output)
152-
self._log("debug", f"\n[{task.agent.role}] Task output: {task_output}\n\n")
153-
self._stop_timer()
135+
self._logger.log(
136+
"debug", f"[{task.agent.role}] Task output: {task_output}\n\n"
137+
)
138+
139+
if self.max_rpm:
140+
self._rpm_controller.stop_rpm_counter()
154141
return task_output
155142

156143
def _prepare_and_execute_task(self, task):
157144
"""Prepares and logs information about the task being executed."""
158145
if task.agent.allow_delegation:
159146
task.tools += AgentTools(agents=self.agents).tools()
160147

161-
self._log("debug", f"Working Agent: {task.agent.role}")
162-
self._log("info", f"Starting Task: {task.description}")
163-
164-
def _log(self, level, message):
165-
"""Logs a message at the specified verbosity level."""
166-
level_map = {"debug": 1, "info": 2}
167-
verbose_level = (
168-
2 if isinstance(self.verbose, bool) and self.verbose else self.verbose
169-
)
170-
if verbose_level and level_map[level] <= verbose_level:
171-
print(f"\n{message}")
172-
173-
def _stop_timer(self):
174-
if self._timer:
175-
self._timer.cancel()
176-
177-
def _reset_request_count(self):
178-
self._stop_timer()
179-
self._timer = threading.Timer(60.0, self._reset_request_count)
180-
self._timer.start()
181-
Crew.rpm = 0
148+
self._logger.log("debug", f"Working Agent: {task.agent.role}")
149+
self._logger.log("info", f"Starting Task: {task.description}")

src/crewai/tools/agent_tools.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from pydantic import BaseModel, Field
55

66
from crewai.agent import Agent
7-
from crewai.i18n import I18N
7+
from crewai.utilities import I18N
88

99

1010
class AgentTools(BaseModel):

src/crewai/utilities/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from .i18n import I18N
2+
from .logger import Logger
3+
from .prompts import Prompts
4+
from .rpm_controller import RPMController

src/crewai/i18n.py renamed to src/crewai/utilities/i18n.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ def load_translation(self) -> "I18N":
1717
"""Load translations from a JSON file based on the specified language."""
1818
try:
1919
dir_path = os.path.dirname(os.path.realpath(__file__))
20-
prompts_path = os.path.join(dir_path, f"translations/{self.language}.json")
20+
prompts_path = os.path.join(
21+
dir_path, f"../translations/{self.language}.json"
22+
)
2123

2224
with open(prompts_path, "r") as f:
2325
self._translations = json.load(f)

src/crewai/utilities/logger.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
class Logger:
2+
def __init__(self, verbose_level=0):
3+
verbose_level = (
4+
2 if isinstance(verbose_level, bool) and verbose_level else verbose_level
5+
)
6+
self.verbose_level = verbose_level
7+
8+
def log(self, level, message):
9+
level_map = {"debug": 1, "info": 2}
10+
if self.verbose_level and level_map.get(level, 0) <= self.verbose_level:
11+
print(f"\n[{level.upper()}]: {message}")

src/crewai/prompts.py renamed to src/crewai/utilities/prompts.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from langchain.prompts import PromptTemplate
44
from pydantic import BaseModel, Field
55

6-
from .i18n import I18N
6+
from crewai.utilities import I18N
77

88

99
class Prompts(BaseModel):

0 commit comments

Comments
 (0)