hu-po commited on
Commit
00ed2b4
1 Parent(s): d03c703

release 0.2

Browse files
src/elevenlabs.py CHANGED
@@ -19,6 +19,7 @@ log = logging.getLogger(__name__)
19
  try:
20
  USER = ElevenLabsUser(os.environ["ELEVENLABS_API_KEY"])
21
  except KeyError as e:
 
22
  log.warning("ELEVENLABS_API_KEY not found in environment variables.")
23
  pass
24
 
 
19
  try:
20
  USER = ElevenLabsUser(os.environ["ELEVENLABS_API_KEY"])
21
  except KeyError as e:
22
+ USER = None
23
  log.warning("ELEVENLABS_API_KEY not found in environment variables.")
24
  pass
25
 
src/src/elevenlabs.py DELETED
@@ -1,128 +0,0 @@
1
- import asyncio
2
- import io
3
- import logging
4
- import os
5
- import time
6
- from concurrent.futures import ThreadPoolExecutor
7
- from dataclasses import dataclass
8
- from typing import List, Union, Tuple
9
-
10
- import sounddevice as sd
11
- import soundfile as sf
12
- from elevenlabslib import ElevenLabsUser, ElevenLabsVoice
13
-
14
- from .utils import timeit
15
-
16
- logging.basicConfig(level=logging.INFO)
17
- log = logging.getLogger(__name__)
18
-
19
- try:
20
- USER = ElevenLabsUser(os.environ["ELEVENLABS_API_KEY"])
21
- except KeyError as e:
22
- USER = None
23
- log.warning("ELEVENLABS_API_KEY not found in environment variables.")
24
- pass
25
-
26
-
27
- @dataclass
28
- class Speaker:
29
- name: str
30
- voice: ElevenLabsVoice
31
- color: str
32
- description: str = None
33
-
34
-
35
- async def text_to_speechbytes_async(text, speaker, loop):
36
- with ThreadPoolExecutor() as executor:
37
- speech_bytes = await loop.run_in_executor(executor, text_to_speechbytes, text, speaker.voice)
38
- return speech_bytes
39
-
40
-
41
- async def play_history(history: List[Tuple[Speaker, str]]):
42
- loop = asyncio.get_event_loop()
43
-
44
- # Create a list of tasks for all text_to_speechbytes function calls
45
- tasks = [text_to_speechbytes_async(
46
- text, speaker, loop) for speaker, text in history]
47
-
48
- # Run tasks concurrently, waiting for the first one to complete
49
- for speech_bytes in await asyncio.gather(*tasks):
50
- audioFile = io.BytesIO(speech_bytes)
51
- soundFile = sf.SoundFile(audioFile)
52
- sd.play(soundFile.read(), samplerate=soundFile.samplerate, blocking=True)
53
-
54
-
55
- async def save_history(history: List[Tuple[Speaker, str]], audio_savepath: str):
56
- loop = asyncio.get_event_loop()
57
-
58
- # Create a list of tasks for all text_to_speechbytes function calls
59
- tasks = [text_to_speechbytes_async(
60
- text, speaker, loop) for speaker, text in history]
61
-
62
- # Run tasks concurrently, waiting for the first one to complete
63
- all_speech_bytes = await asyncio.gather(*tasks)
64
-
65
- # Combine all audio bytes into a single audio file
66
- concatenated_audio = io.BytesIO(b''.join(all_speech_bytes))
67
-
68
- # Save the combined audio file to disk
69
- with sf.SoundFile(concatenated_audio, mode='r') as soundFile:
70
- with sf.SoundFile(
71
- audio_savepath, mode='w',
72
- samplerate=soundFile.samplerate,
73
- channels=soundFile.channels,
74
- ) as outputFile:
75
- outputFile.write(soundFile.read())
76
-
77
-
78
- def check_voice_exists(voice: Union[ElevenLabsVoice, str]) -> Union[ElevenLabsVoice, None]:
79
- if USER is None:
80
- log.warning(
81
- "No ElevenLabsUser found, have you set the ELEVENLABS_API_KEY environment variable?")
82
- return None
83
- log.info(f"Getting voice {voice}...")
84
- _available_voices = USER.get_voices_by_name(voice)
85
- if _available_voices:
86
- log.info(f"Voice {voice} already exists, found {_available_voices}.")
87
- return _available_voices[0]
88
- return None
89
-
90
-
91
- @timeit
92
- def get_make_voice(voice: Union[ElevenLabsVoice, str], audio_path: List[str] = None) -> ElevenLabsVoice:
93
- if USER is None:
94
- log.warning(
95
- "No ElevenLabsUser found, have you set the ELEVENLABS_API_KEY environment variable?")
96
- return None
97
- _voice = check_voice_exists(voice)
98
- if _voice is not None:
99
- return _voice
100
- else:
101
- if USER.get_voice_clone_available():
102
- assert audio_path is not None, "audio_path must be provided"
103
- assert isinstance(audio_path, list), "audio_path must be a list"
104
- log.info(f"Cloning voice {voice}...")
105
- _audio_source_dict = {
106
- # Audio path is a PosixPath
107
- _.name: open(_, "rb").read() for _ in audio_path
108
- }
109
- newVoice = USER.clone_voice_bytes(voice, _audio_source_dict)
110
- return newVoice
111
- raise ValueError(
112
- f"Voice {voice} does not exist and cloning is not available.")
113
-
114
-
115
- @timeit
116
- def text_to_speech(text: str, voice: ElevenLabsVoice):
117
- log.info(f"Generating audio using voice {voice}...")
118
- time_start = time.time()
119
- voice.generate_and_play_audio(text, playInBackground=False)
120
- duration = time.time() - time_start
121
- return duration
122
-
123
-
124
- @timeit
125
- def text_to_speechbytes(text: str, voice: ElevenLabsVoice):
126
- log.info(f"Generating audio for voice {voice} text {text}...")
127
- audio_bytes = voice.generate_audio_bytes(text)
128
- return audio_bytes
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/src/openailib.py DELETED
@@ -1,52 +0,0 @@
1
- import logging
2
- import os
3
-
4
- from .utils import timeit
5
-
6
- import openai
7
-
8
- logging.basicConfig(level=logging.INFO)
9
- log = logging.getLogger(__name__)
10
-
11
- try:
12
- openai.api_key = os.getenv("OPENAI_API_KEY")
13
- except KeyError as e:
14
- log.warning("OPENAI_API_KEY not found in environment variables.")
15
- pass
16
-
17
-
18
- @timeit
19
- def speech_to_text(audio_path):
20
- log.info("Transcribing audio...")
21
- transcript = openai.Audio.transcribe("whisper-1", open(audio_path, "rb"))
22
- text = transcript["text"]
23
- log.info(f"Transcript: \n\t{text}")
24
- return text
25
-
26
-
27
- @timeit
28
- def top_response(prompt, system=None, model="gpt-3.5-turbo", max_tokens=20, temperature=0.8):
29
- _prompt = [
30
- {
31
- "role": "user",
32
- "content": prompt,
33
- },
34
- ]
35
- if system:
36
- _prompt = [
37
- {
38
- "role": "system",
39
- "content": system,
40
- },
41
- ] + _prompt
42
- log.info(f"API call to {model} with prompt: \n\n\t{_prompt}\n\n")
43
- _response = openai.ChatCompletion.create(
44
- model=model,
45
- messages=_prompt,
46
- temperature=temperature,
47
- n=1,
48
- max_tokens=max_tokens,
49
- )
50
- log.info(f"API reponse: \n\t{_response}")
51
- response: str = _response['choices'][0]['message']['content']
52
- return response
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/src/tube.py DELETED
@@ -1,64 +0,0 @@
1
- '''
2
- Extract audio from a YouTube video
3
-
4
- Usage:
5
- tube.py <url> <person> [-s <start_time>] [-d <duration>]
6
- '''
7
-
8
- import subprocess
9
- from pathlib import Path
10
- import datetime
11
- import argparse
12
- import os
13
- from pytube import YouTube
14
-
15
- # Define argparse arguments
16
- parser = argparse.ArgumentParser(description='Extract audio from a YouTube video')
17
- parser.add_argument('url', type=str, help='the YouTube video URL')
18
- parser.add_argument('person', type=str, help='the name of the person speaking')
19
- parser.add_argument('-s', '--start-time', type=float, default=0, help='the start time in minutes for the extracted audio (default: 0)')
20
- parser.add_argument('-d', '--duration', type=int, help='the duration in seconds for the extracted audio (default: 60)')
21
-
22
-
23
- # 200 seconds seems to be max duration for single clips
24
- def extract_audio(url: str, label: str, start_minute: float = 0, duration: int = 200):
25
-
26
- # Download the YouTube video
27
- youtube_object = YouTube(url)
28
- stream = youtube_object.streams.first()
29
- video_path = Path(stream.download(skip_existing=True))
30
-
31
- # Convert start time to seconds
32
- start_time_seconds = int(start_minute * 60)
33
-
34
- # Format the start time in HH:MM:SS.mmm format
35
- start_time_formatted = str(datetime.timedelta(seconds=start_time_seconds))
36
- start_time_formatted = start_time_formatted[:11] + start_time_formatted[12:]
37
-
38
- # Set the output path using the audio file name
39
- output_path = video_path.parent / f"{label}.wav"
40
-
41
- # Run ffmpeg to extract the audio
42
- cmd = ['ffmpeg', '-y', '-i', str(video_path), '-ss', start_time_formatted]
43
- if duration is not None:
44
- # Format the duration in HH:MM:SS.mmm format
45
- duration_formatted = str(datetime.timedelta(seconds=duration))
46
- duration_formatted = duration_formatted[:11] + duration_formatted[12:]
47
- cmd += ['-t', duration_formatted]
48
- cmd += ['-q:a', '0', '-map', 'a', str(output_path)]
49
- subprocess.run(cmd)
50
-
51
- # remove the extra .3gpp file that is created:
52
- for file in os.listdir(video_path.parent):
53
- if file.endswith(".3gpp"):
54
- os.remove(os.path.join(video_path.parent, file))
55
-
56
- return output_path
57
-
58
- if __name__ == '__main__':
59
-
60
- # Parse the arguments
61
- args = parser.parse_args()
62
-
63
- # Extract the audio
64
- extract_audio(args.url, args.person, args.start_time, args.duration)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/src/utils.py DELETED
@@ -1,16 +0,0 @@
1
- import time
2
- import logging
3
-
4
- log = logging.getLogger(__name__)
5
-
6
- # Decorator to time a function
7
- def timeit(func):
8
- def timed(*args, **kwargs):
9
- time_start = time.time()
10
- result = func(*args, **kwargs)
11
- _yellow = "\x1b[33;20m"
12
- _reset = "\x1b[0m"
13
- _msg = f"{_yellow}{func.__name__} duration: {time.time() - time_start:.2f} seconds{_reset}"
14
- log.info(_msg)
15
- return result
16
- return timed