Files
ia-microservice/app/services/audio/transcription_adapters.py

239 lines
9.8 KiB
Python

"""
Gateway de IA de Qualidot - Módulo de Adaptadores de Transcripción
Propósito:
Este módulo contiene funciones de adaptadores que permiten transcribir audio usando diferentes proveedores de IA.
Cada función de adaptador se encarga de interactuar con un proveedor específico (como OpenAI, AssemblyAI, Deepgram, etc.)
y de convertir la respuesta del proveedor al formato estándar de transcripción de Qualidot.
"""
import tempfile
import os
import json
from dotenv import load_dotenv
from fastapi import HTTPException
from openai import OpenAI, AsyncOpenAI
import assemblyai as aai
from deepgram import (
DeepgramClient,
)
from app.core.config import settings
from app.schemas.audio_standard import AudioRequestFile
from app.schemas.audio_standard import StandardTranscriptionResult
from app.utilities.audio_utilities import validate_audio_file, validate_audio_size, validate_audio_request
from app.core import config
# Función de adaptador principal que infiere el proveedor y llama al adaptador específico
async def transcribe_audio_with_provider(audio_request: AudioRequestFile) -> StandardTranscriptionResult:
"""
Función de adaptador para transcribir audio usando el proveedor de IA configurado.
"""
load_dotenv()
provider = audio_request.provider.lower()
match provider:
case "openai":
return await transcribe_with_openai(audio_request)
case "assemblyai":
return await transcribe_with_assemblyai(audio_request)
case "deepgram":
return await transcribe_with_deepgram(audio_request)
case _:
raise ValueError(f"Proveedor de IA no soportado: {audio_request.provider}")
# Función de adaptador para transcribir audio usando OpenAI
async def transcribe_with_openai(audio_request: AudioRequestFile) -> StandardTranscriptionResult:
"""
Función de adaptador para transcribir audio usando OpenAI.
"""
client = AsyncOpenAI(api_key=settings.OPENAI_API_KEY)
audio_content = await audio_request.file.read()
temp_audio_path = None # Inicializamos la variable fuera del try
# Validar el audio antes de continuar
validate_audio_request(audio_request, audio_content)
try:
# Crear archivo temporal para el audio
with tempfile.NamedTemporaryFile(
delete=False,
suffix=os.path.splitext(audio_request.file.filename)[1]
) as temp_audio:
temp_audio.write(audio_content)
temp_audio_path = temp_audio.name
with open(temp_audio_path, "rb") as audio_file_obj:
transcription = await client.audio.transcriptions.create(
model=audio_request.model,
file=audio_file_obj,
response_format="text"
)
result = StandardTranscriptionResult(
status="success",
original_filename=audio_request.file.filename,
full_transcript=transcription,
model_used=audio_request.model,
provider_used="OpenAI",
confidence_score=None
)
return result
except Exception as e:
# Capturamos cualquier error de OpenAI o de lectura de archivos
raise HTTPException(
status_code=500,
detail=f"Error transcribiendo el audio: {str(e)}"
)
finally:
if temp_audio_path and os.path.exists(temp_audio_path):
try:
os.unlink(temp_audio_path)
except Exception:
pass
# Función de adaptador para transcribir audio usando AssemblyAI
async def transcribe_with_assemblyai(audio_request: AudioRequestFile) -> StandardTranscriptionResult:
"""
Función de adaptador para transcribir audio usando AssemblyAI.
"""
aai.settings.api_key = settings.ASSEMBLYAI_API_KEY
audio_content = await audio_request.file.read()
temp_audio_path = None
# Validar el audio antes de continuar
validate_audio_request(audio_request, audio_content)
try:
# Crear archivo temporal para el audio
with tempfile.NamedTemporaryFile(
delete=False,
suffix=os.path.splitext(audio_request.file.filename)[1]
) as temp_audio:
temp_audio.write(audio_content)
temp_audio_path = temp_audio.name
#Definimos el modelo a usar
config = aai.TranscriptionConfig(speech_models = [audio_request.model],
language_code="es", speaker_labels=audio_request.diarization,
sentiment_analysis=audio_request.sentiment)
transcription_obj = aai.Transcriber(config=config).transcribe(temp_audio_path)
iterable = transcription_obj.sentiment_analysis if audio_request.sentiment else transcription_obj.utterances
if transcription_obj.status == aai.TranscriptStatus.error:
raise Exception(f"AssemblyAI Error: {transcription_obj.error}")
assemblyaiSegment = "utterances" if not audio_request.sentiment else "sentiment_analysis"
result = StandardTranscriptionResult(
status="success",
original_filename=audio_request.file.filename,
full_transcript=transcription_obj.text,
model_used=audio_request.model,
provider_used="AssemblyAI",
confidence_score=None,
segments=[
{
"text": segment.text,
"speaker": segment.speaker if audio_request.diarization else None,
"start_time": segment.start if audio_request.timestamps else None,
"end_time": segment.end if audio_request.timestamps else None,
"sentiment": segment.sentiment if audio_request.sentiment else None
}
for segment in iterable
] if (audio_request.diarization or audio_request.timestamps or audio_request.sentiment) else None
)
return result
except Exception as e:
# Capturamos cualquier error de OpenAI o de lectura de archivos
raise HTTPException(
status_code=500,
detail=f"Error transcribiendo el audio: {str(e)}"
)
finally:
if temp_audio_path and os.path.exists(temp_audio_path):
try:
os.unlink(temp_audio_path)
except Exception:
pass
async def transcribe_with_deepgram(audio_request: AudioRequestFile):
"""
Función de adaptador para transcribir audio usando Deepgram.
"""
#Inicializamos el cliente de Deepgram
deepgram = DeepgramClient(api_key=settings.DEEPGRAM_API_KEY)
audio_content = await audio_request.file.read()
temp_audio_path = None
# Validar el audio antes de continuar
validate_audio_request(audio_request, audio_content)
try:
# Crear archivo temporal para el audio
with tempfile.NamedTemporaryFile(
delete=False,
suffix=os.path.splitext(audio_request.file.filename)[1]
) as temp_audio:
temp_audio.write(audio_content)
temp_audio_path = temp_audio.name
with open(temp_audio_path, "rb") as audio_file:
response = deepgram.listen.v1.media.transcribe_file(
request=audio_file.read(),
model=audio_request.model,
sentiment=audio_request.sentiment,
utterances=audio_request.diarization,
diarize=audio_request.diarization,
# Deepgram no tiene una opción específica de "timestamps", pero sí devuelve marcas de tiempo por segmento, así que no es necesario un parámetro adicional para eso
smart_format=True,
language='es',
)
response_json = json.loads(response.json())
result = StandardTranscriptionResult(
status="success",
original_filename=audio_request.file.filename,
full_transcript=response_json.get("results", {}).get("channels", [{}])[0].get("alternatives", [{}])[0].get("transcript", ""),
model_used=audio_request.model,
provider_used="Deepgram",
confidence_score=response_json.get("results", {}).get("channels", [{}])[0].get("alternatives", [{}])[0].get("confidence"),
segments=[
{
"text": sentence.get("text", ""),
"speaker": f"Speaker {paragraph.get('speaker')}" if audio_request.diarization and paragraph.get("speaker") is not None else None,
"start_time": sentence.get("start") if audio_request.timestamps else None,
"end_time": sentence.get("end") if audio_request.timestamps else None,
"sentiment": sentence.get("sentiment") if audio_request.sentiment else None
}
for paragraph in response_json.get("results", {}).get("channels", [{}])[0].get("alternatives", [{}])[0].get("paragraphs", {}).get("paragraphs", [])
for sentence in paragraph.get("sentences", [])
] if (audio_request.diarization or audio_request.timestamps or audio_request.sentiment) else None
)
return result
except Exception as e:
# Capturamos cualquier error de OpenAI o de lectura de archivos
raise HTTPException(
status_code=500,
detail=f"Error transcribiendo el audio: {str(e)}"
)
finally:
if temp_audio_path and os.path.exists(temp_audio_path):
try:
os.unlink(temp_audio_path)
except Exception:
pass