MetaGPT部分源码解读--Role

时间:2024-04-04 17:42:25

RoleReactMode

class RoleReactMode(str, Enum):
    REACT = "react"
    BY_ORDER = "by_order"
    PLAN_AND_ACT = "plan_and_act"

    @classmethod
    def values(cls):
        return [item.value for item in cls]
  1. REACT:表示标准的反应模式,即在思考和行动之间交替,通过 LLN 选择动作。
  2. BY_ORDER:表示按顺序选择动作的模式,即每次执行一个动作,然后进行下一个动作的选择。
  3. PLAN_AND_ACT:表示首先进行规划,然后执行动作序列的模式。

通过定义 values() 方法,可以轻松地获取枚举类中所有常量的值。这个方法会返回一个列表,包含枚举类中所有常量的字符串值。

RoleContext

    model_config = ConfigDict(arbitrary_types_allowed=True)

    # # env exclude=True to avoid `RecursionError: maximum recursion depth exceeded in comparison`
    env: "Environment" = Field(default=None, exclude=True)  # # avoid circular import
    # TODO judge if ser&deser
    msg_buffer: MessageQueue = Field(
        default_factory=MessageQueue, exclude=True
    )  # Message Buffer with Asynchronous Updates
    memory: Memory = Field(default_factory=Memory)
    # long_term_memory: LongTermMemory = Field(default_factory=LongTermMemory)
    state: int = Field(default=-1)  # -1 indicates initial or termination state where todo is None
    todo: Action = Field(default=None, exclude=True)
    watch: set[str] = Field(default_factory=set)
    news: list[Type[Message]] = Field(default=[], exclude=True)  # TODO not used
    react_mode: RoleReactMode = (
        RoleReactMode.REACT
    )  # see `Role._set_react_mode` for definitions of the following two attributes
    max_react_loop: int = 1
  1. model_config: 用于配置模型的参数字典,允许其中包含任意类型的数据。

  2. env: 表示环境的字段,类型为"Environment",默认值为None。由于可能出现循环导入的问题,因此在序列化时需要排除该字段。

  3. msg_buffer: 表示消息缓冲区的字段,类型为MessageQueue,默认工厂函数为MessageQueue,用于异步更新消息。

  4. memory: 表示记忆的字段,类型为Memory,默认工厂函数为Memory,用于存储角色的记忆信息。

  5. state: 表示状态的字段,类型为int,默认值为-1,表示初始或终止状态,其中-1表示没有要执行的操作。

  6. todo: 表示要执行的操作的字段,类型为Action,默认值为None,用于存储要执行的下一步操作。

  7. watch: 表示观察的字段,类型为set[str],默认工厂函数为set,用于存储角色感兴趣的操作。

  8. news: 表示新闻的字段,类型为list[Type[Message]],默认值为空列表,用于存储角色观察到的新消息。

  9. react_mode: 表示反应模式的字段,类型为RoleReactMode,默认值为RoleReactMode.REACT,用于定义角色在反应观察到的消息时的策略。

  10. max_react_loop: 表示最大反应循环次数的字段,类型为int,默认值为1,用于控制角色在反应模式为RoleReactMode.REACT时的最大循环次数。

    def check(self, role_id: str):
        # if hasattr(CONFIG, "long_term_memory") and CONFIG.long_term_memory:
        #     self.long_term_memory.recover_memory(role_id, self)
        #     self.memory = self.long_term_memory  # use memory to act as long_term_memory for unify operation
        pass

    @property
    def important_memory(self) -> list[Message]:
        """Retrieve information corresponding to the attention action."""
        return self.memory.get_by_actions(self.watch)

    @property
    def history(self) -> list[Message]:
        return self.memory.get()

important_memory

    def get_by_actions(self, actions: Set) -> list[Message]:
        """Return all messages triggered by specified Actions"""
        rsp = []
        indices = any_to_str_set(actions)
        for action in indices:
            if action not in self.index:
                continue
            rsp += self.index[action]
        return rsp
  1. 接收一个参数 actions,类型为集合(Set),其中包含了要查询的动作集合。

  2. 将传入的动作集合转换为字符串类型的集合,这是因为内部索引是以字符串形式存储的。

  3. 遍历转换后的动作集合,对每个动作进行处理:

    • 如果该动作不在内部索引中,则跳过该动作。
    • 如果该动作在内部索引中,则将与该动作相关联的消息添加到结果列表中。
  4. 返回包含了所有相关消息的结果列表。

history

    def get(self, k=0) -> list[Message]:
        """Return the most recent k memories, return all when k=0"""
        return self.storage[-k:]

返回storage的最近k个消息

Role

    model_config = ConfigDict(arbitrary_types_allowed=True, exclude=["llm"])

    name: str = ""
    profile: str = ""
    goal: str = ""
    constraints: str = ""
    desc: str = ""
    is_human: bool = False

    llm: BaseLLM = Field(default_factory=LLM, exclude=True)  # Each role has its own LLM, use different system message
    role_id: str = ""
    states: list[str] = []
    actions: list[SerializeAsAny[Action]] = Field(default=[], validate_default=True)
    rc: RoleContext = Field(default_factory=RoleContext)
    subscription: set[str] = set()

    # builtin variables
    recovered: bool = False  # to tag if a recovered role
    latest_observed_msg: Optional[Message] = None  # record the latest observed message when interrupted

    __hash__ = object.__hash__  # support Role as hashable type in `Environment.members`
  1. model_config: 这是一个 ConfigDict 对象,用于配置模型的参数。arbitrary_types_allowed=True 表示允许任意类型的值,exclude=["llm"] 表示在序列化时排除 llm 属性。

  2. name, profile, goal, constraints, desc, is_human: 这些是类的属性,分别表示角色的名称、概要、目标、约束、描述和是否为人类。它们都有默认值,类型分别为字符串和布尔值。

  3. llm: 这是一个属性,表示每个角色具有自己的 BaseLLM 对象。默认使用 LLM 的默认函数创建,并在序列化时排除。

  4. role_id, states, actions, rc, subscription, recovered, latest_observed_msg: 这些也是类的属性,分别表示角色的ID、状态列表、动作列表、角色上下文、订阅集合、是否已恢复、最新观察到的消息等。它们都有默认值,类型不同。

  5. __hash__: 这是类的特殊属性,用于定义对象的哈希值,以支持 Role 作为可哈希类型在环境的成员中使用。Role 对象可以作为集合的元素或字典的键,而不会引发错误或产生意外结果

@model_validator(mode="after")
    def check_subscription(self):
        if not self.subscription:
            self.subscription = {any_to_str(self), self.name} if self.name else {any_to_str(self)}
        return self

检查角色对象的订阅内容是否为空,如果为空,则设置默认的订阅内容。如果角色对象的名称不为空,则将角色对象的名称作为订阅内容之一,否则将角色对象本身转换为字符串后作为订阅内容之一。

__init__

    def __init__(self, **data: Any):
        # --- avoid PydanticUndefinedAnnotation name 'Environment' is not defined #
        from metagpt.environment import Environment

        Environment
        # ------
        Role.model_rebuild()
        super().__init__(**data)

        self.llm.system_prompt = self._get_prefix()
        self._watch(data.get("watch") or [UserRequirement])
@classmethod
    def model_rebuild(
        cls,
        *,
        force: bool = False,
        raise_errors: bool = True,
        _parent_namespace_depth: int = 2,
        _types_namespace: dict[str, Any] | None = None,
    ) -> bool | None:
        """Try to rebuild the pydantic-core schema for the model.

        This may be necessary when one of the annotations is a ForwardRef which could not be resolved during
        the initial attempt to build the schema, and automatic rebuilding fails.

        Args:
            force: Whether to force the rebuilding of the model schema, defaults to `False`.
            raise_errors: Whether to raise errors, defaults to `True`.
            _parent_namespace_depth: The depth level of the parent namespace, defaults to 2.
            _types_namespace: The types namespace, defaults to `None`.

        Returns:
            Returns `None` if the schema is already "complete" and rebuilding was not required.
            If rebuilding _was_ required, returns `True` if rebuilding was successful, otherwise `False`.
        """
        if not force and cls.__pydantic_complete__:
            return None
        else:
            if '__pydantic_core_schema__' in cls.__dict__:
                delattr(cls, '__pydantic_core_schema__')  # delete cached value to ensure full rebuild happens
            if _types_namespace is not None:
                types_namespace: dict[str, Any] | None = _types_namespace.copy()
            else:
                if _parent_namespace_depth > 0:
                    frame_parent_ns = _typing_extra.parent_frame_namespace(parent_depth=_parent_namespace_depth) or {}
                    cls_parent_ns = (
                        _model_construction.unpack_lenient_weakvaluedict(cls.__pydantic_parent_namespace__) or {}
                    )
                    types_namespace = {**cls_parent_ns, **frame_parent_ns}
                    cls.__pydantic_parent_namespace__ = _model_construction.build_lenient_weakvaluedict(types_namespace)
                else:
                    types_namespace = _model_construction.unpack_lenient_weakvaluedict(
                        cls.__pydantic_parent_namespace__
                    )

                types_namespace = _typing_extra.get_cls_types_namespace(cls, types_namespace)

            # manually override defer_build so complete_model_class doesn't skip building the model again
            config = {**cls.model_config, 'defer_build': False}
            return _model_construction.complete_model_class(
                cls,
                cls.__name__,
                _config.ConfigWrapper(config, check=False),
                raise_errors=raise_errors,
                types_namespace=types_namespace,
            )
  • force: 这个参数是一个布尔值,默认为 False。如果设置为 True,则强制重新构建模型模式,即使模式已经是完整的也会重新构建。

  • raise_errors: 这个参数也是一个布尔值,默认为 True。如果设置为 True,则在出现错误时会抛出异常;如果设置为 False,则在出现错误时会返回错误信息但不会抛出异常。

  • _parent_namespace_depth: 这是父命名空间的深度级别,默认为 2。这个参数用于构建类型命名空间。

  • _types_namespace: 这是一个字典类型,用于存储类型命名空间。默认为 None,表示没有传入类型命名空间。

    def _get_prefix(self):
        """Get the role prefix"""
        if self.desc:
            return self.desc

        prefix = PREFIX_TEMPLATE.format(**{"profile": self.profile, "name": self.name, "goal": self.goal})

        if self.constraints:
            prefix += CONSTRAINT_TEMPLATE.format(**{"constraints": self.constraints})

        if self.rc.env and self.rc.env.desc:
            other_role_names = ", ".join(self.rc.env.role_names())
            env_desc = f"You are in {self.rc.env.desc} with roles({other_role_names})."
            prefix += env_desc
        return prefix
  1. 如果角色的描述 (desc) 不为空,则直接返回描述。
  2. 否则,根据一定的模板格式化角色的配置、名称和目标信息,生成前缀字符串。
  3. 如果角色有约束 (constraints),则将约束信息也添加到前缀字符串中。
  4. 如果角色的上下文环境 (rc.env) 不为空并且环境描述 (env.desc) 也不为空,则获取环境描述和环境中其他角色的名称,并添加到前缀字符串中。
  5. 最后返回生成的前缀字符串。

    def serialize(self, stg_path: Path = None):
        stg_path = (
            SERDESER_PATH.joinpath(f"team/environment/roles/{self.__class__.__name__}_{self.name}")
            if stg_path is None
            else stg_path
        )

        role_info = self.model_dump(exclude={"rc": {"memory": True, "msg_buffer": True}, "llm": True})
        role_info.update({"role_class": self.__class__.__name__, "module_name": self.__module__})
        role_info_path = stg_path.joinpath("role_info.json")
        write_json_file(role_info_path, role_info)

        self.rc.memory.serialize(stg_path)  # serialize role's memory alone

    @classmethod
    def deserialize(cls, stg_path: Path) -> "Role":
        """stg_path = ./storage/team/environment/roles/{role_class}_{role_name}"""
        role_info_path = stg_path.joinpath("role_info.json")
        role_info = read_json_file(role_info_path)

        role_class_str = role_info.pop("role_class")
        module_name = role_info.pop("module_name")
        role_class = import_class(class_name=role_class_str, module_name=module_name)

        role = role_class(**role_info)  # initiate particular Role
        role.set_recovered(True)  # set True to make a tag

        role_memory = Memory.deserialize(stg_path)
        role.set_memory(role_memory)

        return role

将角色对象序列化到文件中,以及从文件中反序列化出角色对象

将角色对象序列化到文件中用于实现持久化数据与不同平台之间共享

_init_actions

    def _init_actions(self, actions):
        self._reset()
        for idx, action in enumerate(actions):
            if not isinstance(action, Action):
                ## 默认初始化
                i = action(name="", llm=self.llm)
            else:
                if self.is_human and not isinstance(action.llm, HumanProvider):
                    logger.warning(
                        f"is_human attribute does not take effect, "
                        f"as Role's {str(action)} was initialized using LLM, "
                        f"try passing in Action classes instead of initialized instances"
                    )
                i = action
            self._init_action_system_message(i)
            self.actions.append(i)
            self.states.append(f"{idx}. {action}")

对于每个动作,它首先检查其是否已经是一个 Action 类的实例。如果不是,则默认以空字符串和当前角色的 llm(长期记忆)实例化一个新的动作对象

将初始化的动作对象添加到角色对象的动作列表中

将动作的描述添加到角色对象的状态列表中,格式为动作索引加动作描述

    def _set_react_mode(self, react_mode: str, max_react_loop: int = 1):
        """Set strategy of the Role reacting to observed Message. Variation lies in how
        this Role elects action to perform during the _think stage, especially if it is capable of multiple Actions.

        Args:
            react_mode (str): Mode for choosing action during the _think stage, can be one of:
                        "react": standard think-act loop in the ReAct paper, alternating thinking and acting to solve the task, i.e. _think -> _act -> _think -> _act -> ...
                                 Use llm to select actions in _think dynamically;
                        "by_order": switch action each time by order defined in _init_actions, i.e. _act (Action1) -> _act (Action2) -> ...;
                        "plan_and_act": first plan, then execute an action sequence, i.e. _think (of a plan) -> _act -> _act -> ...
                                        Use llm to come up with the plan dynamically.
                        Defaults to "react".
            max_react_loop (int): Maximum react cycles to execute, used to prevent the agent from reacting forever.
                                  Take effect only when react_mode is react, in which we use llm to choose actions, including termination.
                                  Defaults to 1, i.e. _think -> _act (-> return result and end)
        """
        assert react_mode in RoleReactMode.values(), f"react_mode must be one of {RoleReactMode.values()}"
        self.rc.react_mode = react_mode
        if react_mode == RoleReactMode.REACT:
            self.rc.max_react_loop = max_react_loop

用于设置角色对观察到的消息做出反应的策略。它允许根据不同的反应模式选择角色应该执行的动作,这些模式如下:

  • react: 在思考阶段交替思考和执行动作,以解决任务。使用长期记忆(llm)动态选择思考阶段的动作。每个循环中包含一个思考和一个执行动作的步骤。max_react_loop 参数用于指定最大的反应循环次数,以防止角色永远执行。默认情况下,设置为1,即只执行一次思考和执行动作的循环,然后返回结果并结束。

  • by_order: 按照初始化动作列表中定义的顺序每次切换一个动作。在每个循环中,按照动作列表中的顺序依次执行动作。

  • plan_and_act: 先计划,然后执行一个动作序列。在思考阶段,角色制定一个计划,然后执行该计划中的动作序列。使用长期记忆(llm)动态生成计划。

_watch

    def _watch(self, actions: Iterable[Type[Action]] | Iterable[Action]):
        """Watch Actions of interest. Role will select Messages caused by these Actions from its personal message
        buffer during _observe.
        """
        self.rc.watch = {any_to_str(t) for t in actions}
        # check RoleContext after adding watch actions
        self.rc.check(self.role_id)

用于观察感兴趣的动作。在这个方法中,角色将选择由这些动作引起的消息,并在观察阶段从其个人消息缓冲区中选择这些消息。

在添加观察动作后,方法会调用 self.rc.check(self.role_id) 来检查角色上下文,以确保角色的一些设置已经完整


    def is_watch(self, caused_by: str):
        return caused_by in self.rc.watch

    def subscribe(self, tags: Set[str]):
        """Used to receive Messages with certain tags from the environment. Message will be put into personal message
        buffer to be further processed in _observe. By default, a Role subscribes Messages with a tag of its own name
        or profile.
        """
        self.subscription = tags
        if self.rc.env:  # According to the routing feature plan in Chapter 2.2.3.2 of RFC 113
            self.rc.env.set_subscription(self, self.subscription)

is_watch 方法用于检查某个消息是否是由角色感兴趣的动作引起的

subscribe 方法用于订阅特定标签的消息,这些消息将来自环境。消息将被放置到个人消息缓冲区中,以便在观察阶段进一步处理。默认情况下,角色会订阅具有自己名称或配置文件的标签的消息

_set_state

    def _set_state(self, state: int):
        """Update the current state."""
        self.rc.state = state
        logger.debug(f"actions={self.actions}, state={state}")
        self.rc.todo = self.actions[self.rc.state] if state >= 0 else None

state大于0时将动作设为待办操作


    def set_env(self, env: "Environment"):
        """Set the environment in which the role works. The role can talk to the environment and can also receive
        messages by observing."""
        self.rc.env = env
        if env:
            env.set_subscription(self, self.subscription)
            self.refresh_system_message()  # add env message to system message

    @property
    def action_count(self):
        """Return number of action"""
        return len(self.actions)

set_env 将角色上下文中的环境属性 self.rc.env 更新为指定的环境对象。如果环境对象不为空,则会调用环境对象的 set_subscription 方法,将角色订阅的消息传递给环境。接着,它调用 refresh_system_message 方法,将环境消息添加到系统消息中

action_count返回角色的动作数量

_think

    async def _think(self) -> bool:
        """Consider what to do and decide on the next course of action. Return false if nothing can be done."""
        if len(self.actions) == 1:
            # If there is only one action, then only this one can be performed
            self._set_state(0)

            return True

        if self.recovered and self.rc.state >= 0:
            self._set_state(self.rc.state)  # action to run from recovered state
            self.set_recovered(False)  # avoid max_react_loop out of work
            return True

        prompt = self._get_prefix()
        prompt += STATE_TEMPLATE.format(
            history=self.rc.history,
            states="\n".join(self.states),
            n_states=len(self.states) - 1,
            previous_state=self.rc.state,
        )

        next_state = await self.llm.aask(prompt)
        next_state = extract_state_value_from_output(next_state)
        logger.debug(f"{prompt=}")

        if (not next_state.isdigit() and next_state != "-1") or int(next_state) not in range(-1, len(self.states)):
            logger.warning(f"Invalid answer of state, {next_state=}, will be set to -1")
            next_state = -1
        else:
            next_state = int(next_state)
            if next_state == -1:
                logger.info(f"End actions with {next_state=}")
        self._set_state(next_state)
        return True

先检查角色是否只有一个动作,如果是,则直接设置状态为 0,并返回 True

如果角色处于恢复状态且状态值大于等于 0,则将状态设置为恢复状态,并取消角色的恢复状态,以避免超出最大反应循环次数

(恢复状态通常意味着角色在某种情况下中断了之前的操作,可能是由于意外中断、系统故障或者其他原因导致的。当角色处于恢复状态时,它可能希望在继续执行操作之前回到之前的状态或者继续执行之前的操作)

然后,构建提示信息 prompt,包括角色的前缀、历史状态、当前可用状态等信息,并通过 llm.aask 方法向长期记忆模型(LLM)询问下一个状态。如果提取的状态值不是有效的数字或不在合法范围内,则将其设置为 -1

最后,根据得到的下一个状态值,调用 _set_state 方法设置角色的状态,并返回 True 表示已经完成思考和决策

_act

    async def _act(self) -> Message:
        logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
        response = await self.rc.todo.run(self.rc.history)
        if isinstance(response, (ActionOutput, ActionNode)):
            msg = Message(
                content=response.content,
                instruct_content=response.instruct_content,
                role=self._setting,
                cause_by=self.rc.todo,
                sent_from=self,
            )
        elif isinstance(response, Message):
            msg = response
        else:
            msg = Message(content=response, role=self.profile, cause_by=self.rc.todo, sent_from=self)
        self.rc.memory.add(msg)

        return msg

执行动作,并将信息添加到角色的记忆中,返回Message

_observe

    async def _observe(self, ignore_memory=False) -> int:
        """Prepare new messages for processing from the message buffer and other sources."""
        # Read unprocessed messages from the msg buffer.
        news = []
        if self.recovered:
            news = [self.latest_observed_msg] if self.latest_observed_msg else []
        if not news:
            news = self.rc.msg_buffer.pop_all()
        # Store the read messages in your own memory to prevent duplicate processing.
        old_messages = [] if ignore_memory else self.rc.memory.get()
        self.rc.memory.add_batch(news)
        # Filter out messages of interest.
        self.rc.news = [
            n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages
        ]
        self.latest_observed_msg = self.rc.news[-1] if self.rc.news else None  # record the latest observed msg

        # Design Rules:
        # If you need to further categorize Message objects, you can do so using the Message.set_meta function.
        # msg_buffer is a receiving buffer, avoid adding message data and operations to msg_buffer.
        news_text = [f"{i.role}: {i.content[:20]}..." for i in self.rc.news]
        if news_text:
            logger.debug(f"{self._setting} observed: {news_text}")
        return len(self.rc.news)

首先检查是否处于恢复状态,如果是,则将最新观察到的消息添加到要处理的消息列表中。如果不处于恢复状态,则从消息缓冲区中获取所有未处理的消息

然后,将读取的消息添加到角色的内存中,以防止重复处理

接下来,根据角色关注的动作或者消息接收者的名称,筛选出感兴趣的消息,并将其存储在 self.rc.news 属性中

最后,记录最新观察到的消息,并返回观察到的新消息数量


    def publish_message(self, msg):
        """If the role belongs to env, then the role's messages will be broadcast to env"""
        if not msg:
            return
        if not self.rc.env:
            # If env does not exist, do not publish the message
            return
        self.rc.env.publish_message(msg)

    def put_message(self, message):
        """Place the message into the Role object's private message buffer."""
        if not message:
            return
        self.rc.msg_buffer.push(message)

_react

    async def _react(self) -> Message:
        """Think first, then act, until the Role _think it is time to stop and requires no more todo.
        This is the standard think-act loop in the ReAct paper, which alternates thinking and acting in task solving, i.e. _think -> _act -> _think -> _act -> ...
        Use llm to select actions in _think dynamically
        """
        actions_taken = 0
        rsp = Message(content="No actions taken yet", cause_by=Action)  # will be overwritten after Role _act
        while actions_taken < self.rc.max_react_loop:
            # think
            await self._think()
            if self.rc.todo is None:
                break
            # act
            logger.debug(f"{self._setting}: {self.rc.state=}, will do {self.rc.todo}")
            rsp = await self._act()  # 这个rsp是否需要publish_message?
            actions_taken += 1
        return rsp  # return output from the last action

_think -> _act -> _think -> _act -> ...

在每个循环中,角色会先调用 _think 方法来考虑下一步的行动。如果角色确定有待执行的行动(self.rc.todo 不为 None),则会执行行动(调用 _act 方法)。在执行完毕后,会更新响应消息(rsp),并累加已执行的行动次数

至于是否需要在 _act 方法中调用 publish_message,取决于具体的应用场景和需求。通常情况下,如果执行的行动会产生对环境或其他角色的影响,并且需要将这些影响通知给其他角色或者环境,那么就需要在 _act 方法中调用 publish_message 方法来发布消息

_act_by_order

    async def _act_by_order(self) -> Message:
        """switch action each time by order defined in _init_actions, i.e. _act (Action1) -> _act (Action2) -> ..."""
        start_idx = self.rc.state if self.rc.state >= 0 else 0  # action to run from recovered state
        rsp = Message(content="No actions taken yet")  # return default message if actions=[]
        for i in range(start_idx, len(self.states)):
            self._set_state(i)
            rsp = await self._act()
        return rsp  # return output from the last action

角色按照在 _init_actions 方法中定义的顺序执行行动。如果角色处于恢复状态(self.rc.state >= 0),则从恢复的状态开始执行行动,否则从第一个行动开始执行

_plan_and_act

    async def _plan_and_act(self) -> Message:
        """first plan, then execute an action sequence, i.e. _think (of a plan) -> _act -> _act -> ... Use llm to come up with the plan dynamically."""
        # TODO: to be implemented
        return Message(content="")

接口,等用户自己写

react

    async def react(self) -> Message:
        """Entry to one of three strategies by which Role reacts to the observed Message"""
        if self.rc.react_mode == RoleReactMode.REACT:
            rsp = await self._react()
        elif self.rc.react_mode == RoleReactMode.BY_ORDER:
            rsp = await self._act_by_order()
        elif self.rc.react_mode == RoleReactMode.PLAN_AND_ACT:
            rsp = await self._plan_and_act()
        self._set_state(state=-1)  # current reaction is complete, reset state to -1 and todo back to None
        return rsp

按照策略执行不同的反应模式

get_memories

    def get_memories(self, k=0) -> list[Message]:
        """A wrapper to return the most recent k memories of this role, return all when k=0"""
        return self.rc.memory.get(k=k)

run

    async def run(self, with_message=None) -> Message | None:
        """Observe, and think and act based on the results of the observation"""
        if with_message:
            msg = None
            if isinstance(with_message, str):
                msg = Message(content=with_message)
            elif isinstance(with_message, Message):
                msg = with_message
            elif isinstance(with_message, list):
                msg = Message(content="\n".join(with_message))
            if not msg.cause_by:
                msg.cause_by = UserRequirement
            self.put_message(msg)

        if not await self._observe():
            # If there is no new information, suspend and wait
            logger.debug(f"{self._setting}: no news. waiting.")
            return

        rsp = await self.react()

        # Reset the next action to be taken.
        self.rc.todo = None
        # Send the response message to the Environment object to have it relay the message to the subscribers.
        self.publish_message(rsp)
        return rsp

角色首先观察当前环境的消息。如果传入了 with_message 参数,则将其转换为消息对象并放入角色的消息缓冲区中。接着,角色观察环境中的新消息,并根据观察结果进行反应,即调用 react 方法

如果没有观察到新的信息,角色将进入等待状态,暂停执行。如果观察到了新的信息,则执行反应,并将得到的响应消息发送给环境对象,让环境对象将消息传递给订阅者


    @property
    def is_idle(self) -> bool:
        """If true, all actions have been executed."""
        return not self.rc.news and not self.rc.todo and self.rc.msg_buffer.empty()

    async def think(self) -> Action:
        """The exported `think` function"""
        await self._observe()
        await self._think()
        return self.rc.todo

    async def act(self) -> ActionOutput:
        """The exported `act` function"""
        msg = await self._act()
        return ActionOutput(content=msg.content, instruct_content=msg.instruct_content)

todo

    @property
    def todo(self) -> str:
        """
        AgentStore uses this attribute to display to the user what actions the current role should take.
        """
        if self.rc.todo:
            if self.rc.todo.desc:
                return self.rc.todo.desc
            return any_to_name(self.rc.todo)
        if self.actions:
            return any_to_name(self.actions[0])
        return ""

如果 rc.todo 不为空,则返回该动作的描述(如果有的话),否则返回第一个操作的名称。如果没有任何操作可执行,则返回空字符串