Spaces:
Sleeping
Sleeping
import sys | |
import logging | |
import ray | |
from collections import deque | |
from typing import Dict, List | |
from pathlib import Path | |
sys.path.append(str(Path(__file__).resolve().parent.parent)) | |
from extensions.ray_objectives import CooperativeObjectivesListStorage | |
try: | |
ray.init(address="auto", namespace="babyagi", logging_level=logging.FATAL, ignore_reinit_error=True) | |
except: | |
ray.init(namespace="babyagi", logging_level=logging.FATAL, ignore_reinit_error=True) | |
class CooperativeTaskListStorageActor: | |
def __init__(self): | |
self.tasks = deque([]) | |
self.task_id_counter = 0 | |
def append(self, task: Dict): | |
self.tasks.append(task) | |
def replace(self, tasks: List[Dict]): | |
self.tasks = deque(tasks) | |
def popleft(self): | |
return self.tasks.popleft() | |
def is_empty(self): | |
return False if self.tasks else True | |
def next_task_id(self): | |
self.task_id_counter += 1 | |
return self.task_id_counter | |
def get_task_names(self): | |
return [t["task_name"] for t in self.tasks] | |
class CooperativeTaskListStorage: | |
def __init__(self, name: str): | |
self.name = name | |
try: | |
self.actor = ray.get_actor(name=self.name, namespace="babyagi") | |
except ValueError: | |
self.actor = CooperativeTaskListStorageActor.options(name=self.name, namespace="babyagi", lifetime="detached").remote() | |
objectives = CooperativeObjectivesListStorage() | |
objectives.append(self.name) | |
def append(self, task: Dict): | |
self.actor.append.remote(task) | |
def replace(self, tasks: List[Dict]): | |
self.actor.replace.remote(tasks) | |
def popleft(self): | |
return ray.get(self.actor.popleft.remote()) | |
def is_empty(self): | |
return ray.get(self.actor.is_empty.remote()) | |
def next_task_id(self): | |
return ray.get(self.actor.next_task_id.remote()) | |
def get_task_names(self): | |
return ray.get(self.actor.get_task_names.remote()) | |