opex792 commited on
Commit
af2000a
·
verified ·
1 Parent(s): 9a46a7b

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +95 -14
app.py CHANGED
@@ -13,6 +13,7 @@ import logging
13
  from sklearn.preprocessing import normalize
14
  from concurrent.futures import ThreadPoolExecutor
15
  import requests
 
16
 
17
  # Настройка логирования
18
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@@ -43,6 +44,9 @@ VOYAGE_API_KEY = os.environ.get("VOYAGE_API_KEY")
43
  if VOYAGE_API_KEY is None:
44
  raise ValueError("VOYAGE_API_KEY environment variable not set.")
45
 
 
 
 
46
  # Имена таблиц
47
  embeddings_table = "movie_embeddings"
48
  query_cache_table = "query_cache"
@@ -79,7 +83,19 @@ batch_size = 32
79
  num_threads = 5
80
 
81
  # Количество потоков для параллельного реранкинга
82
- rerank_threads = 5 # Подберите оптимальное значение
 
 
 
 
 
 
 
 
 
 
 
 
83
 
84
  def get_db_connection():
85
  """Устанавливает соединение с базой данных."""
@@ -298,8 +314,64 @@ def get_movie_embeddings(conn):
298
  logging.error(f"Ошибка при загрузке эмбеддингов фильмов: {e}")
299
  return movie_embeddings
300
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
301
  def rerank_batch_voyage(query, batch):
302
  """Переранжирует пакет результатов с помощью Voyage AI."""
 
 
 
 
303
  url = "https://api.voyageai.com/v1/rerank"
304
  headers = {
305
  "Authorization": f"Bearer {VOYAGE_API_KEY}",
@@ -315,6 +387,8 @@ def rerank_batch_voyage(query, batch):
315
  documents.append(movie_info)
316
  movie_ids.append(movie_id)
317
 
 
 
318
  payload = {
319
  "query": query,
320
  "documents": documents,
@@ -324,6 +398,13 @@ def rerank_batch_voyage(query, batch):
324
  }
325
 
326
  try:
 
 
 
 
 
 
 
327
  response = requests.post(url, headers=headers, json=payload)
328
  response.raise_for_status() # Проверка на ошибки HTTP
329
  response_json = response.json()
@@ -337,6 +418,12 @@ def rerank_batch_voyage(query, batch):
337
 
338
  except requests.exceptions.RequestException as e:
339
  logging.error(f"Ошибка запроса к Voyage AI: {e}")
 
 
 
 
 
 
340
  return []
341
  except KeyError as e:
342
  logging.error(f"Ошибка обработки ответа от Voyage AI: {e}. Полный ответ: {response_json}")
@@ -345,26 +432,20 @@ def rerank_batch_voyage(query, batch):
345
  def rerank_results(query, results):
346
  """Переранжирует результаты поиска с помощью Voyage AI."""
347
  logging.info(f"Начало переранжирования для запроса: '{query}'")
 
 
 
 
348
  reranked_results = []
349
 
350
  with ThreadPoolExecutor(max_workers=rerank_threads) as executor:
351
  futures = []
352
- batch = []
353
  batch_num = 0
354
- for i, result in enumerate(results):
355
- batch.append(result)
356
- if len(batch) >= batch_size: # Отправляем на реранк батчами
357
- logging.info(f"Отправка на переранжирование батча {batch_num+1} ({len(batch)} фильмов)")
358
- future = executor.submit(rerank_batch_voyage, query, batch)
359
- futures.append(future)
360
- batch = []
361
- batch_num += 1
362
-
363
- # Обработка остатка
364
- if batch:
365
  logging.info(f"Отправка на переранжирование батча {batch_num+1} ({len(batch)} фильмов)")
366
  future = executor.submit(rerank_batch_voyage, query, batch)
367
  futures.append(future)
 
368
 
369
  # Сбор результатов
370
  for i, future in enumerate(futures):
@@ -422,7 +503,7 @@ def search_movies(query, top_k=20):
422
  FROM {embeddings_table} m, query_embedding
423
  ORDER BY similarity DESC
424
  LIMIT %s
425
- """, (query_crc32, int(top_k * 1.1))) # Уменьшаем лимит до * 1.1
426
 
427
  results = cur.fetchall()
428
  logging.info(f"Найдено {len(results)} предварительных результатов поиска.")
 
13
  from sklearn.preprocessing import normalize
14
  from concurrent.futures import ThreadPoolExecutor
15
  import requests
16
+ import voyageai
17
 
18
  # Настройка логирования
19
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 
44
  if VOYAGE_API_KEY is None:
45
  raise ValueError("VOYAGE_API_KEY environment variable not set.")
46
 
47
+ # Инициализация клиента Voyage AI
48
+ vo = voyageai.Client(api_key=VOYAGE_API_KEY)
49
+
50
  # Имена таблиц
51
  embeddings_table = "movie_embeddings"
52
  query_cache_table = "query_cache"
 
83
  num_threads = 5
84
 
85
  # Количество потоков для параллельного реранкинга
86
+ rerank_threads = 3 # Ограничено лимитом RPM
87
+
88
+ # Лимиты Voyage AI (запросов в минуту, токенов в минуту) - БЕСПЛАТНЫЙ АККАУНТ
89
+ RPM_LIMIT = 3
90
+ TPM_LIMIT = 10000
91
+
92
+ # Переменные для отслеживания текущего использования
93
+ current_rpm = 0
94
+ current_tpm = 0
95
+ last_reset_time = time.time()
96
+
97
+ # Среднее количество токенов на описание фильма (можно вычислить один раз при запуске)
98
+ avg_tokens_per_movie = 150 # Замените на более точное значение, если оно известно
99
 
100
  def get_db_connection():
101
  """Устанавливает соединение с базой данных."""
 
314
  logging.error(f"Ошибка при загрузке эмбеддингов фильмов: {e}")
315
  return movie_embeddings
316
 
317
+ def check_and_wait_for_limits():
318
+ """Проверяет лимиты RPM и TPM и ожидает, если они исчерпаны."""
319
+ global current_rpm, current_tpm, last_reset_time
320
+
321
+ elapsed_time = time.time() - last_reset_time
322
+
323
+ if elapsed_time >= 60:
324
+ current_rpm = 0
325
+ current_tpm = 0
326
+ last_reset_time = time.time()
327
+ logging.info("Лимиты RPM и TPM сброшены.")
328
+
329
+ if current_rpm >= RPM_LIMIT or current_tpm >= TPM_LIMIT:
330
+ wait_time = 60 - elapsed_time
331
+ logging.warning(f"Превышены лимиты RPM ({current_rpm}/{RPM_LIMIT}) или TPM ({current_tpm}/{TPM_LIMIT}). Ожидание {wait_time:.2f} секунд...")
332
+ time.sleep(max(0, wait_time))
333
+ current_rpm = 0
334
+ current_tpm = 0
335
+ last_reset_time = time.time()
336
+ logging.info("Лимиты RPM и TPM сброшены после ожидания.")
337
+
338
+ def create_optimized_batches(query, results, max_tokens_per_batch=TPM_LIMIT):
339
+ """Создает батчи для реранкинга, оптимизированные по количеству токенов."""
340
+ global avg_tokens_per_movie
341
+
342
+ batches = []
343
+ current_batch = []
344
+ current_batch_tokens = 0
345
+
346
+ query_tokens = vo.count_tokens([query], model="rerank-2")
347
+
348
+ for movie_id, _ in results:
349
+ movie = next((m for m in movies_data if m['id'] == movie_id), None)
350
+ if movie:
351
+ movie_info = f"Название: {movie['name']}\nГод: {movie['year']}\nЖанры: {movie['genreslist']}\nОписание: {movie['description']}"
352
+
353
+ # Считаем токены, но не отправляем запрос если лимит уже исчерпан
354
+ estimated_movie_tokens = avg_tokens_per_movie
355
+
356
+ if (current_batch_tokens + query_tokens + estimated_movie_tokens) <= max_tokens_per_batch:
357
+ current_batch.append((movie_id, _))
358
+ current_batch_tokens += estimated_movie_tokens
359
+ else:
360
+ batches.append(current_batch)
361
+ current_batch = [(movie_id, _)]
362
+ current_batch_tokens = estimated_movie_tokens
363
+
364
+ if current_batch:
365
+ batches.append(current_batch)
366
+
367
+ return batches
368
+
369
  def rerank_batch_voyage(query, batch):
370
  """Переранжирует пакет результатов с помощью Voyage AI."""
371
+ global current_rpm, current_tpm
372
+
373
+ check_and_wait_for_limits()
374
+
375
  url = "https://api.voyageai.com/v1/rerank"
376
  headers = {
377
  "Authorization": f"Bearer {VOYAGE_API_KEY}",
 
387
  documents.append(movie_info)
388
  movie_ids.append(movie_id)
389
 
390
+
391
+
392
  payload = {
393
  "query": query,
394
  "documents": documents,
 
398
  }
399
 
400
  try:
401
+ batch_tokens = vo.count_tokens([query] + documents, model="rerank-2")
402
+
403
+ current_rpm += 1
404
+ current_tpm += batch_tokens
405
+
406
+ logging.info(f"Отправка запроса к Voyage AI. RPM: {current_rpm}/{RPM_LIMIT}, TPM: {current_tpm}/{TPM_LIMIT}, Токенов в запросе: {batch_tokens}")
407
+
408
  response = requests.post(url, headers=headers, json=payload)
409
  response.raise_for_status() # Проверка на ошибки HTTP
410
  response_json = response.json()
 
418
 
419
  except requests.exceptions.RequestException as e:
420
  logging.error(f"Ошибка запроса к Voyage AI: {e}")
421
+
422
+ if response.status_code == 429: # Too Many Requests
423
+ logging.warning("Слишком много запросов к Voyage AI. Ожидание сброса лимитов...")
424
+ check_and_wait_for_limits()
425
+ return rerank_batch_voyage(query, batch) # Повторная попытка после ожидания
426
+
427
  return []
428
  except KeyError as e:
429
  logging.error(f"Ошибка обработки ответа от Voyage AI: {e}. Полный ответ: {response_json}")
 
432
  def rerank_results(query, results):
433
  """Переранжирует результаты поиска с помощью Voyage AI."""
434
  logging.info(f"Начало переранжирования для запроса: '{query}'")
435
+
436
+ # Создаем оптимизированные батчи
437
+ batches = create_optimized_batches(query, results)
438
+
439
  reranked_results = []
440
 
441
  with ThreadPoolExecutor(max_workers=rerank_threads) as executor:
442
  futures = []
 
443
  batch_num = 0
444
+ for batch in batches:
 
 
 
 
 
 
 
 
 
 
445
  logging.info(f"Отправка на переранжирование батча {batch_num+1} ({len(batch)} фильмов)")
446
  future = executor.submit(rerank_batch_voyage, query, batch)
447
  futures.append(future)
448
+ batch_num += 1
449
 
450
  # Сбор результатов
451
  for i, future in enumerate(futures):
 
503
  FROM {embeddings_table} m, query_embedding
504
  ORDER BY similarity DESC
505
  LIMIT %s
506
+ """, (query_crc32, int(top_k * 2))) # Увеличиваем лимит до * 2
507
 
508
  results = cur.fetchall()
509
  logging.info(f"Найдено {len(results)} предварительных результатов поиска.")