opex792 commited on
Commit
b98faae
·
verified ·
1 Parent(s): e8b8846

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +1 -166
app.py CHANGED
@@ -54,27 +54,6 @@ embeddings_table = "movie_embeddings"
54
  query_cache_table = "query_cache"
55
  movies_table = "Movies" # Имя таблицы с фильмами
56
 
57
- # Максимальный размер таблицы кэша запросов в байтах (50MB)
58
- MAX_CACHE_SIZE = 50 * 1024 * 1024
59
-
60
- # Очередь для необработанных фильмов
61
- movies_queue = queue.Queue()
62
-
63
- # Флаг, указывающий, что обработка фильмов завершена
64
- processing_complete = False
65
-
66
- # Флаг, указывающий, что выполняется поиск
67
- search_in_progress = False
68
-
69
- # Блокировка для доступа к базе данных
70
- db_lock = threading.Lock()
71
-
72
- # Размер пакета для обработки эмбеддингов
73
- batch_size = 32
74
-
75
- # Количество потоков для параллельной обработки
76
- num_threads = 5
77
-
78
  # FastAPI приложение
79
  app = FastAPI()
80
 
@@ -143,46 +122,6 @@ def encode_string(text):
143
  embedding = model.encode(text, convert_to_tensor=True, normalize_embeddings=True)
144
  return embedding.cpu().numpy()
145
 
146
- def get_movies_without_embeddings():
147
- """Получает список фильмов, для которых нужно создать эмбеддинги."""
148
- conn = get_db_connection()
149
- if conn is None:
150
- return []
151
-
152
- movies_to_process = []
153
- try:
154
- with conn.cursor() as cur:
155
- # Получаем список ID фильмов, которые уже есть в таблице эмбеддингов
156
- cur.execute(f"SELECT movie_id FROM \"{embeddings_table}\"")
157
- existing_ids = {row[0] for row in cur.fetchall()}
158
-
159
- # Получаем список всех фильмов из таблицы Movies с подготовленной строкой
160
- cur.execute(f"""
161
- SELECT id, data,
162
- jsonb_build_object(
163
- 'Название', data->>'name',
164
- 'Год', data->>'year',
165
- 'Жанры', (SELECT string_agg(genre->>'name', ', ') FROM jsonb_array_elements(data->'genres') AS genre),
166
- 'Описание', COALESCE(data->>'description', '')
167
- ) AS prepared_json
168
- FROM "{movies_table}"
169
- """)
170
- all_movies = cur.fetchall()
171
-
172
- # Фильтруем только те фильмы, которых нет в таблице эмбеддингов
173
- for movie_id, movie_data, prepared_json in all_movies:
174
- if movie_id not in existing_ids:
175
- prepared_string = f"Название: {prepared_json['Название']}\nГод: {prepared_json['Год']}\nЖанры: {prepared_json['Жанры']}\nОписание: {prepared_json['Описание']}"
176
- movies_to_process.append((movie_id, movie_data, prepared_string))
177
-
178
- logging.info(f"Найдено {len(movies_to_process)} фильмов для обработки.")
179
- except Exception as e:
180
- logging.error(f"Ошибка при получении списка фильмов для обработки: {e}")
181
- finally:
182
- conn.close()
183
-
184
- return movies_to_process
185
-
186
  def get_embedding_from_db(conn, table_name, crc32_column, crc32_value, model_name):
187
  """Получает эмбеддинг из базы данных."""
188
  try:
@@ -197,97 +136,6 @@ def get_embedding_from_db(conn, table_name, crc32_column, crc32_value, model_nam
197
  logging.error(f"Ошибка при получении эмбеддинга из БД: {e}")
198
  return None
199
 
200
- def insert_embedding(conn, table_name, movie_id, embedding_crc32, string_crc32, embedding):
201
- """Вставляет эмбеддинг в базу данных."""
202
- try:
203
- # Нормализуем эмбеддинг перед сохранением
204
- normalized_embedding = normalize(embedding.reshape(1, -1))[0]
205
- with conn.cursor() as cur:
206
- cur.execute(f"""
207
- INSERT INTO "{table_name}"
208
- (movie_id, embedding_crc32, string_crc32, model_name, embedding)
209
- VALUES (%s, %s, %s, %s, %s)
210
- ON CONFLICT (movie_id) DO NOTHING
211
- """, (movie_id, embedding_crc32, string_crc32, model_name, normalized_embedding.tolist()))
212
- conn.commit()
213
- return True
214
- except Exception as e:
215
- logging.error(f"Ошибка при вставке эмбеддинга: {e}")
216
- conn.rollback()
217
- return False
218
-
219
- def process_batch(batch):
220
- """Обрабатывает пакет фильмов, создавая для них эмбеддинги."""
221
- conn = get_db_connection()
222
- if conn is None:
223
- return
224
-
225
- try:
226
- for movie_id, movie_data, prepared_string in batch:
227
- string_crc32 = calculate_crc32(prepared_string)
228
-
229
- # Проверяем существующий эмбеддинг
230
- existing_embedding = get_embedding_from_db(conn, embeddings_table, "string_crc32", string_crc32, model_name)
231
-
232
- if existing_embedding is None:
233
- embedding = encode_string(prepared_string)
234
- embedding_crc32 = calculate_crc32(str(embedding.tolist()))
235
-
236
- if insert_embedding(conn, embeddings_table, movie_id, embedding_crc32, string_crc32, embedding):
237
- logging.info(f"Сохранен эмбеддинг для '{movie_data['name']}' (ID: {movie_id})")
238
- else:
239
- logging.error(f"Ошибка сохранения эмбеддинга для '{movie_data['name']}' (ID: {movie_id})")
240
- else:
241
- logging.info(f"Эмбеддинг для '{movie_data['name']}' (ID: {movie_id}) уже существует")
242
- except Exception as e:
243
- logging.error(f"Ошибка при обработке пакета фильмов: {e}")
244
- finally:
245
- conn.close()
246
-
247
- def process_movies():
248
- """Обрабатывает фильмы, создавая для них эмбеддинги."""
249
- global processing_complete
250
-
251
- logging.info("Начало обработки фильмов.")
252
-
253
- # Получаем список фильмов, которые нужно обработать
254
- movies_to_process = get_movies_without_embeddings()
255
-
256
- if not movies_to_process:
257
- logging.info("Все фильмы уже обработаны.")
258
- processing_complete = True
259
- return
260
-
261
- # Добавляем фильмы в очередь
262
- for movie in movies_to_process:
263
- movies_queue.put(movie)
264
-
265
- with ThreadPoolExecutor(max_workers=num_threads) as executor:
266
- try:
267
- while not movies_queue.empty():
268
- if search_in_progress:
269
- time.sleep(1)
270
- continue
271
-
272
- batch = []
273
- while not movies_queue.empty() and len(batch) < batch_size:
274
- try:
275
- movie = movies_queue.get_nowait()
276
- batch.append(movie)
277
- except queue.Empty:
278
- break
279
-
280
- if not batch:
281
- break
282
-
283
- executor.submit(process_batch, batch)
284
- logging.info(f"Отправлен на обработку пакет из {len(batch)} фильмов.")
285
- except Exception as e:
286
- logging.error(f"Ошибка при обработке фильмов: {e}")
287
-
288
- processing_complete = True
289
- logging.info("Обработка фильмов завершена")
290
-
291
  def get_movie_data_from_db(conn, movie_ids):
292
  """Получает данные фильмов из таблицы Movies по списку ID."""
293
  movie_data_dict = {}
@@ -362,11 +210,9 @@ def rerank_with_api(query, results, top_k):
362
  except requests.exceptions.RequestException as e:
363
  logging.error(f"Ошибка при запросе к API реранжировщика: {e}")
364
  return []
365
-
366
  def search_movies_internal(query: str, top_k: int = 25):
367
  """Внутренняя функция для поиска фильмов по запросу (используется и в Gradio, и в API)."""
368
- global search_in_progress
369
- search_in_progress = True
370
  start_time = time.time()
371
 
372
  try:
@@ -449,9 +295,6 @@ def search_movies_internal(query: str, top_k: int = 25):
449
  logging.error(f"Ошибка при выполнении поиска: {e}")
450
  raise
451
 
452
- finally:
453
- search_in_progress = False
454
-
455
  def search_movies(query, top_k=25):
456
  """Функция поиска фильмов для Gradio интерфейса."""
457
  try:
@@ -476,14 +319,6 @@ async def api_search_movies(query: str = Query(..., description="Поисков
476
  except Exception as e:
477
  raise HTTPException(status_code=500, detail=str(e))
478
 
479
- # Запускаем обработку фильмов в отдельном потоке (если ещё не запущена)
480
- if not 'processing_thread' in globals():
481
- processing_thread = threading.Thread(target=process_movies)
482
- processing_thread.start()
483
- elif not processing_thread.is_alive():
484
- processing_thread = threading.Thread(target=process_movies)
485
- processing_thread.start()
486
-
487
  # Создаем интерфейс Gradio
488
  iface = gr.Interface(
489
  fn=search_movies,
 
54
  query_cache_table = "query_cache"
55
  movies_table = "Movies" # Имя таблицы с фильмами
56
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
  # FastAPI приложение
58
  app = FastAPI()
59
 
 
122
  embedding = model.encode(text, convert_to_tensor=True, normalize_embeddings=True)
123
  return embedding.cpu().numpy()
124
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
125
  def get_embedding_from_db(conn, table_name, crc32_column, crc32_value, model_name):
126
  """Получает эмбеддинг из базы данных."""
127
  try:
 
136
  logging.error(f"Ошибка при получении эмбеддинга из БД: {e}")
137
  return None
138
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
139
  def get_movie_data_from_db(conn, movie_ids):
140
  """Получает данные фильмов из таблицы Movies по списку ID."""
141
  movie_data_dict = {}
 
210
  except requests.exceptions.RequestException as e:
211
  logging.error(f"Ошибка при запросе к API реранжировщика: {e}")
212
  return []
213
+
214
  def search_movies_internal(query: str, top_k: int = 25):
215
  """Внутренняя функция для поиска фильмов по запросу (используется и в Gradio, и в API)."""
 
 
216
  start_time = time.time()
217
 
218
  try:
 
295
  logging.error(f"Ошибка при выполнении поиска: {e}")
296
  raise
297
 
 
 
 
298
  def search_movies(query, top_k=25):
299
  """Функция поиска фильмов для Gradio интерфейса."""
300
  try:
 
319
  except Exception as e:
320
  raise HTTPException(status_code=500, detail=str(e))
321
 
 
 
 
 
 
 
 
 
322
  # Создаем интерфейс Gradio
323
  iface = gr.Interface(
324
  fn=search_movies,