Spaces:
Sleeping
Sleeping
File size: 1,985 Bytes
14dc68f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
import sys
import logging
import ray
from collections import deque
from typing import Dict, List
from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parent.parent))
from extensions.ray_objectives import CooperativeObjectivesListStorage
try:
ray.init(address="auto", namespace="babyagi", logging_level=logging.FATAL, ignore_reinit_error=True)
except:
ray.init(namespace="babyagi", logging_level=logging.FATAL, ignore_reinit_error=True)
@ray.remote
class CooperativeTaskListStorageActor:
def __init__(self):
self.tasks = deque([])
self.task_id_counter = 0
def append(self, task: Dict):
self.tasks.append(task)
def replace(self, tasks: List[Dict]):
self.tasks = deque(tasks)
def popleft(self):
return self.tasks.popleft()
def is_empty(self):
return False if self.tasks else True
def next_task_id(self):
self.task_id_counter += 1
return self.task_id_counter
def get_task_names(self):
return [t["task_name"] for t in self.tasks]
class CooperativeTaskListStorage:
def __init__(self, name: str):
self.name = name
try:
self.actor = ray.get_actor(name=self.name, namespace="babyagi")
except ValueError:
self.actor = CooperativeTaskListStorageActor.options(name=self.name, namespace="babyagi", lifetime="detached").remote()
objectives = CooperativeObjectivesListStorage()
objectives.append(self.name)
def append(self, task: Dict):
self.actor.append.remote(task)
def replace(self, tasks: List[Dict]):
self.actor.replace.remote(tasks)
def popleft(self):
return ray.get(self.actor.popleft.remote())
def is_empty(self):
return ray.get(self.actor.is_empty.remote())
def next_task_id(self):
return ray.get(self.actor.next_task_id.remote())
def get_task_names(self):
return ray.get(self.actor.get_task_names.remote())
|