File size: 2,723 Bytes
8332c01 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
import datetime
import requests
class Trigger:
def __init__(self, trigger_tags, comparison_tags, time_definition, event_name, included=True):
self.trigger_tags = set(trigger_tags)
self.comparison_tags = set(comparison_tags)
self.time_definition = time_definition
self.event_name = event_name
self.included = included
self.threshold = 0
self.actions = []
self.sources = []
def add_action(self, action):
self.actions.append(action)
def remove_action(self, action):
if action in self.actions:
self.actions.remove(action)
else:
print("Action not found")
def add_source(self, source):
self.sources.append(source)
def remove_source(self, source):
if source in self.sources:
self.sources.remove(source)
else:
print("Source not found")
def check_trigger(self, current_tags, current_time):
if self.included:
if current_time in self.time_definition and self.trigger_tags.issubset(current_tags):
self.threshold += 1
else:
self.threshold = 0
else:
if current_time in self.time_definition and not self.trigger_tags.intersection(current_tags):
self.threshold += 1
else:
self.threshold = 0
if self.threshold >= len(self.time_definition):
self.fire_actions()
self.make_requests()
def fire_actions(self):
for action in self.actions:
action(self.event_name)
def make_requests(self):
for source in self.sources:
try:
response = requests.get(source)
# Procesar la respuesta aqu铆 si es necesario
print(f"Request made to {source}. Status code: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"Error making request to {source}: {e}")
# Ejemplo de uso:
def action_function(event_name):
print(f"Trigger fired for event: {event_name}")
if __name__ == "__main__":
# Definici贸n de un trigger
trigger = Trigger(["tag1", "tag2"], ["tag3", "tag4"], [datetime.time(10, 0), datetime.time(15, 0)], "Event1")
# A帽adir una acci贸n al trigger
trigger.add_action(action_function)
# A帽adir una fuente al trigger
trigger.add_source("https://example.com/api/data")
# Simular la comprobaci贸n peri贸dica del trigger (aqu铆 se usar铆a en un bucle de tiempo real)
current_tags = {"tag1", "tag2", "tag3"}
current_time = datetime.datetime.now().time()
trigger.check_trigger(current_tags, current_time)
|