Jon Taylor
pipelines
70de1d6
raw
history blame
6.14 kB
import argparse
import queue
import time
import threading
import logging
import os
from PIL import Image
from daily import EventHandler, CallClient, Daily
from datetime import datetime
from dotenv import load_dotenv
from auth import get_meeting_token, get_room_name
from pipeline import Pipeline
from device import device, torch_dtype
load_dotenv()
class DailyVision(EventHandler):
def __init__(
self,
room_url,
room_name,
expiration,
idle,
bot_name="Daily Bot"
):
self.__client = CallClient(event_handler=self)
self.__pipeline = Pipeline
self.__camera = None
self.__time = time.time()
self.__queue = queue.Queue()
self.__app_quit = False
self.__bot_name = bot_name
self.__room_url = room_url
self.__room_name = room_name
self.__expiration = expiration
self.__idle = idle
# Create the pipeline (this might take a moment)
self.__pipeline = Pipeline(device, torch_dtype)
#print(self.__pipeline.InputParams.schema())
# Configure logger
FORMAT = f"%(asctime)s {self.__room_url} %(message)s"
logging.basicConfig(format=FORMAT)
self.logger = logging.getLogger("bot-instance")
self.logger.setLevel(logging.DEBUG)
self.logger.info(f"Expiration timer set to: {self.__expiration}")
def run(self, meeting_url, token):
# Join
self.logger.info(f"Connecting to room {meeting_url} as {self.__bot_name}")
self.__client.set_user_name(self.__bot_name)
self.__client.join(meeting_url, token, completion=self.on_joined)
#self.__participant_id = self.client.participants()["local"]["id"]
# Start thread
self.__thread = threading.Thread(target = self.process_frames)
self.__thread.start()
# Keep-alive on thread
self.__thread.join()
def leave(self):
self.logger.info(f"Leaving...")
self.__app_quit = True
self.__thread.join()
self.__client.leave()
def on_joined(self, join_data, client_error):
self.logger.info(f"call_joined: {join_data}, {client_error}")
def on_participant_joined(self, participant):
self.logger.info(f"Participant {participant['id']} joined, analyzing frames...")
self.__client.set_video_renderer(participant["id"], self.on_video_frame)
# Say hello
self.wave()
def setup_camera(self, video_frame):
if not self.__camera:
self.__camera = Daily.create_camera_device("camera",
width = video_frame.width,
height = video_frame.height,
color_format="RGBA")
self.__client.update_inputs({
"camera": {
"isEnabled": True,
"settings": {
"deviceId": "camera"
}
}
})
def process_frames(self):
while not self.__app_quit:
# Is anyone watching?
if not self.__idle and len(self.__client.participants()) < 2:
self.logger.info(f"No partcipants in channel. Exiting...")
self.__app_quit = True
break
# Check expiry timer
if time.time() > self.__expiration:
self.logger.info(f"Expiration timer exceeded. Exiting...")
self.__app_quit = True
break
try:
video_frame = self.__queue.get(timeout=5)
if video_frame:
image = Image.frombytes("RGBA", (video_frame.width, video_frame.height), video_frame.buffer)
#result = self.__pipeline(image)
#pil = Image.fromarray(result.render()[0], mode="RGB").tobytes()
self.__camera.write_frame(image.tobytes())
except queue.Empty:
pass
def on_video_frame(self, participant_id, video_frame):
# Process ~15 frames per second (considering incoming frames at 30fps).
if time.time() - self.__time > 0.05:
self.__time = time.time()
self.setup_camera(video_frame)
self.__queue.put(video_frame)
def wave(self, emoji="👋"):
self.__client.send_app_message(
{
"event": "sync-emoji-reaction",
"reaction": {
"emoji": emoji,
"room": "main-room",
"sessionId": "bot",
"id": time.time(),
},
}
)
def main():
parser = argparse.ArgumentParser(description="Daily Bot")
# Required args
parser.add_argument("-u", "--url", required=True, type=str, help="URL of the Daily room")
parser.add_argument("-k", "--api_key", required=True, type=str, help="Daily API key")
# Optional args
parser.add_argument("-t", "--private", type=bool, help="Is this room private?", default=True)
parser.add_argument("-n", "--bot-name", type=str, help="Name of the bot", default="Daily Bot")
parser.add_argument("-e", "--expiration", type=int, help="Duration of bot", default=os.getenv("BOT_MAX_DURATION", 300))
parser.add_argument("-i", "--idle", type=bool, help="Wait for participants to join", default=False)
args = parser.parse_args()
Daily.init()
expiration = time.time() + args.expiration
room_name = get_room_name(args.url)
# Retrieve a meeting token, if not provided
#@TODO do room lookup to check privacy
if args.private:
token = get_meeting_token(room_name, args.api_key, expiration)
app = DailyVision(args.url, room_name, expiration, args.idle, args.bot_name)
try :
app.run(args.url, token)
except KeyboardInterrupt:
print("Ctrl-C detected. Exiting!")
finally:
app.leave()
# Let leave finish
time.sleep(2)
if __name__ == '__main__':
main()