""" Gateway de IA de Qualidot - Módulo de Adaptadores de Transcripción Propósito: Este módulo contiene funciones de adaptadores que permiten transcribir audio usando diferentes proveedores de IA. Cada función de adaptador se encarga de interactuar con un proveedor específico (como OpenAI, AssemblyAI, Deepgram, etc.) y de convertir la respuesta del proveedor al formato estándar de transcripción de Qualidot. """ import tempfile import os import json from dotenv import load_dotenv from fastapi import HTTPException from openai import OpenAI, AsyncOpenAI import assemblyai as aai from deepgram import ( DeepgramClient, ) from app.core.config import settings from app.schemas.audio_standard import AudioRequestFile from app.schemas.audio_standard import StandardTranscriptionResult from app.utilities.audio_utilities import validate_audio_file, validate_audio_size, validate_audio_request from app.core import config # Función de adaptador principal que infiere el proveedor y llama al adaptador específico async def transcribe_audio_with_provider(audio_request: AudioRequestFile) -> StandardTranscriptionResult: """ Función de adaptador para transcribir audio usando el proveedor de IA configurado. """ load_dotenv() provider = audio_request.provider.lower() match provider: case "openai": return await transcribe_with_openai(audio_request) case "assemblyai": return await transcribe_with_assemblyai(audio_request) case "deepgram": return await transcribe_with_deepgram(audio_request) case _: raise ValueError(f"Proveedor de IA no soportado: {audio_request.provider}") # Función de adaptador para transcribir audio usando OpenAI async def transcribe_with_openai(audio_request: AudioRequestFile) -> StandardTranscriptionResult: """ Función de adaptador para transcribir audio usando OpenAI. """ client = AsyncOpenAI(api_key=settings.OPENAI_API_KEY) audio_content = await audio_request.file.read() temp_audio_path = None # Inicializamos la variable fuera del try # Validar el audio antes de continuar validate_audio_request(audio_request, audio_content) try: # Crear archivo temporal para el audio with tempfile.NamedTemporaryFile( delete=False, suffix=os.path.splitext(audio_request.file.filename)[1] ) as temp_audio: temp_audio.write(audio_content) temp_audio_path = temp_audio.name with open(temp_audio_path, "rb") as audio_file_obj: transcription = await client.audio.transcriptions.create( model=audio_request.model, file=audio_file_obj, response_format="text" ) result = StandardTranscriptionResult( status="success", original_filename=audio_request.file.filename, full_transcript=transcription, model_used=audio_request.model, provider_used="OpenAI", confidence_score=None ) return result except Exception as e: # Capturamos cualquier error de OpenAI o de lectura de archivos raise HTTPException( status_code=500, detail=f"Error transcribiendo el audio: {str(e)}" ) finally: if temp_audio_path and os.path.exists(temp_audio_path): try: os.unlink(temp_audio_path) except Exception: pass # Función de adaptador para transcribir audio usando AssemblyAI async def transcribe_with_assemblyai(audio_request: AudioRequestFile) -> StandardTranscriptionResult: """ Función de adaptador para transcribir audio usando AssemblyAI. """ aai.settings.api_key = settings.ASSEMBLYAI_API_KEY audio_content = await audio_request.file.read() temp_audio_path = None # Validar el audio antes de continuar validate_audio_request(audio_request, audio_content) try: # Crear archivo temporal para el audio with tempfile.NamedTemporaryFile( delete=False, suffix=os.path.splitext(audio_request.file.filename)[1] ) as temp_audio: temp_audio.write(audio_content) temp_audio_path = temp_audio.name #Definimos el modelo a usar config = aai.TranscriptionConfig(speech_models = [audio_request.model], language_code="es", speaker_labels=audio_request.diarization, sentiment_analysis=audio_request.sentiment) transcription_obj = aai.Transcriber(config=config).transcribe(temp_audio_path) iterable = transcription_obj.sentiment_analysis if audio_request.sentiment else transcription_obj.utterances if transcription_obj.status == aai.TranscriptStatus.error: raise Exception(f"AssemblyAI Error: {transcription_obj.error}") assemblyaiSegment = "utterances" if not audio_request.sentiment else "sentiment_analysis" result = StandardTranscriptionResult( status="success", original_filename=audio_request.file.filename, full_transcript=transcription_obj.text, model_used=audio_request.model, provider_used="AssemblyAI", confidence_score=None, segments=[ { "text": segment.text, "speaker": segment.speaker if audio_request.diarization else None, "start_time": segment.start if audio_request.timestamps else None, "end_time": segment.end if audio_request.timestamps else None, "sentiment": segment.sentiment if audio_request.sentiment else None } for segment in iterable ] if (audio_request.diarization or audio_request.timestamps or audio_request.sentiment) else None ) return result except Exception as e: # Capturamos cualquier error de OpenAI o de lectura de archivos raise HTTPException( status_code=500, detail=f"Error transcribiendo el audio: {str(e)}" ) finally: if temp_audio_path and os.path.exists(temp_audio_path): try: os.unlink(temp_audio_path) except Exception: pass async def transcribe_with_deepgram(audio_request: AudioRequestFile): """ Función de adaptador para transcribir audio usando Deepgram. """ #Inicializamos el cliente de Deepgram deepgram = DeepgramClient(api_key=settings.DEEPGRAM_API_KEY) audio_content = await audio_request.file.read() temp_audio_path = None # Validar el audio antes de continuar validate_audio_request(audio_request, audio_content) try: # Crear archivo temporal para el audio with tempfile.NamedTemporaryFile( delete=False, suffix=os.path.splitext(audio_request.file.filename)[1] ) as temp_audio: temp_audio.write(audio_content) temp_audio_path = temp_audio.name with open(temp_audio_path, "rb") as audio_file: response = deepgram.listen.v1.media.transcribe_file( request=audio_file.read(), model=audio_request.model, sentiment=audio_request.sentiment, utterances=audio_request.diarization, diarize=audio_request.diarization, # Deepgram no tiene una opción específica de "timestamps", pero sí devuelve marcas de tiempo por segmento, así que no es necesario un parámetro adicional para eso smart_format=True, language='es', ) response_json = json.loads(response.json()) result = StandardTranscriptionResult( status="success", original_filename=audio_request.file.filename, full_transcript=response_json.get("results", {}).get("channels", [{}])[0].get("alternatives", [{}])[0].get("transcript", ""), model_used=audio_request.model, provider_used="Deepgram", confidence_score=response_json.get("results", {}).get("channels", [{}])[0].get("alternatives", [{}])[0].get("confidence"), segments=[ { "text": sentence.get("text", ""), "speaker": f"Speaker {paragraph.get('speaker')}" if audio_request.diarization and paragraph.get("speaker") is not None else None, "start_time": sentence.get("start") if audio_request.timestamps else None, "end_time": sentence.get("end") if audio_request.timestamps else None, "sentiment": sentence.get("sentiment") if audio_request.sentiment else None } for paragraph in response_json.get("results", {}).get("channels", [{}])[0].get("alternatives", [{}])[0].get("paragraphs", {}).get("paragraphs", []) for sentence in paragraph.get("sentences", []) ] if (audio_request.diarization or audio_request.timestamps or audio_request.sentiment) else None ) return result except Exception as e: # Capturamos cualquier error de OpenAI o de lectura de archivos raise HTTPException( status_code=500, detail=f"Error transcribiendo el audio: {str(e)}" ) finally: if temp_audio_path and os.path.exists(temp_audio_path): try: os.unlink(temp_audio_path) except Exception: pass