Spaces:
Sleeping
Sleeping
import datetime | |
import requests | |
class Trigger: | |
def __init__(self, trigger_tags, comparison_tags, time_definition, event_name, included=True): | |
self.trigger_tags = set(trigger_tags) | |
self.comparison_tags = set(comparison_tags) | |
self.time_definition = time_definition | |
self.event_name = event_name | |
self.included = included | |
self.threshold = 0 | |
self.actions = [] | |
self.sources = [] | |
def add_action(self, action): | |
self.actions.append(action) | |
def remove_action(self, action): | |
if action in self.actions: | |
self.actions.remove(action) | |
else: | |
print("Action not found") | |
def add_source(self, source): | |
self.sources.append(source) | |
def remove_source(self, source): | |
if source in self.sources: | |
self.sources.remove(source) | |
else: | |
print("Source not found") | |
def check_trigger(self, current_tags, current_time): | |
if self.included: | |
if current_time in self.time_definition and self.trigger_tags.issubset(current_tags): | |
self.threshold += 1 | |
else: | |
self.threshold = 0 | |
else: | |
if current_time in self.time_definition and not self.trigger_tags.intersection(current_tags): | |
self.threshold += 1 | |
else: | |
self.threshold = 0 | |
if self.threshold >= len(self.time_definition): | |
self.fire_actions() | |
self.make_requests() | |
def fire_actions(self): | |
for action in self.actions: | |
action(self.event_name) | |
def make_requests(self): | |
for source in self.sources: | |
try: | |
response = requests.get(source) | |
# Procesar la respuesta aqu铆 si es necesario | |
print(f"Request made to {source}. Status code: {response.status_code}") | |
except requests.exceptions.RequestException as e: | |
print(f"Error making request to {source}: {e}") | |
# Ejemplo de uso: | |
def action_function(event_name): | |
print(f"Trigger fired for event: {event_name}") | |
if __name__ == "__main__": | |
# Definici贸n de un trigger | |
trigger = Trigger(["tag1", "tag2"], ["tag3", "tag4"], [datetime.time(10, 0), datetime.time(15, 0)], "Event1") | |
# A帽adir una acci贸n al trigger | |
trigger.add_action(action_function) | |
# A帽adir una fuente al trigger | |
trigger.add_source("https://example.com/api/data") | |
# Simular la comprobaci贸n peri贸dica del trigger (aqu铆 se usar铆a en un bucle de tiempo real) | |
current_tags = {"tag1", "tag2", "tag3"} | |
current_time = datetime.datetime.now().time() | |
trigger.check_trigger(current_tags, current_time) | |