forked from ericorps/ia-microservice
239 lines
9.8 KiB
Python
239 lines
9.8 KiB
Python
"""
|
|
Gateway de IA de Qualidot - Módulo de Adaptadores de Transcripción
|
|
|
|
Propósito:
|
|
Este módulo contiene funciones de adaptadores que permiten transcribir audio usando diferentes proveedores de IA.
|
|
Cada función de adaptador se encarga de interactuar con un proveedor específico (como OpenAI, AssemblyAI, Deepgram, etc.)
|
|
y de convertir la respuesta del proveedor al formato estándar de transcripción de Qualidot.
|
|
|
|
"""
|
|
|
|
import tempfile
|
|
import os
|
|
import json
|
|
from dotenv import load_dotenv
|
|
from fastapi import HTTPException
|
|
from openai import OpenAI, AsyncOpenAI
|
|
import assemblyai as aai
|
|
from deepgram import (
|
|
DeepgramClient,
|
|
)
|
|
from app.core.config import settings
|
|
from app.schemas.audio_standard import AudioRequestFile
|
|
from app.schemas.audio_standard import StandardTranscriptionResult
|
|
from app.utilities.audio_utilities import validate_audio_file, validate_audio_size, validate_audio_request
|
|
from app.core import config
|
|
|
|
# Función de adaptador principal que infiere el proveedor y llama al adaptador específico
|
|
async def transcribe_audio_with_provider(audio_request: AudioRequestFile) -> StandardTranscriptionResult:
|
|
"""
|
|
Función de adaptador para transcribir audio usando el proveedor de IA configurado.
|
|
"""
|
|
load_dotenv()
|
|
provider = audio_request.provider.lower()
|
|
|
|
match provider:
|
|
case "openai":
|
|
return await transcribe_with_openai(audio_request)
|
|
case "assemblyai":
|
|
return await transcribe_with_assemblyai(audio_request)
|
|
case "deepgram":
|
|
return await transcribe_with_deepgram(audio_request)
|
|
case _:
|
|
raise ValueError(f"Proveedor de IA no soportado: {audio_request.provider}")
|
|
|
|
# Función de adaptador para transcribir audio usando OpenAI
|
|
async def transcribe_with_openai(audio_request: AudioRequestFile) -> StandardTranscriptionResult:
|
|
"""
|
|
Función de adaptador para transcribir audio usando OpenAI.
|
|
"""
|
|
|
|
client = AsyncOpenAI(api_key=settings.OPENAI_API_KEY)
|
|
|
|
audio_content = await audio_request.file.read()
|
|
|
|
temp_audio_path = None # Inicializamos la variable fuera del try
|
|
|
|
# Validar el audio antes de continuar
|
|
validate_audio_request(audio_request, audio_content)
|
|
|
|
try:
|
|
# Crear archivo temporal para el audio
|
|
with tempfile.NamedTemporaryFile(
|
|
delete=False,
|
|
suffix=os.path.splitext(audio_request.file.filename)[1]
|
|
) as temp_audio:
|
|
temp_audio.write(audio_content)
|
|
temp_audio_path = temp_audio.name
|
|
|
|
with open(temp_audio_path, "rb") as audio_file_obj:
|
|
transcription = await client.audio.transcriptions.create(
|
|
model=audio_request.model,
|
|
file=audio_file_obj,
|
|
response_format="text"
|
|
)
|
|
|
|
result = StandardTranscriptionResult(
|
|
status="success",
|
|
original_filename=audio_request.file.filename,
|
|
full_transcript=transcription,
|
|
model_used=audio_request.model,
|
|
provider_used="OpenAI",
|
|
confidence_score=None
|
|
)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
# Capturamos cualquier error de OpenAI o de lectura de archivos
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error transcribiendo el audio: {str(e)}"
|
|
)
|
|
|
|
finally:
|
|
if temp_audio_path and os.path.exists(temp_audio_path):
|
|
try:
|
|
os.unlink(temp_audio_path)
|
|
except Exception:
|
|
pass
|
|
|
|
# Función de adaptador para transcribir audio usando AssemblyAI
|
|
async def transcribe_with_assemblyai(audio_request: AudioRequestFile) -> StandardTranscriptionResult:
|
|
"""
|
|
Función de adaptador para transcribir audio usando AssemblyAI.
|
|
"""
|
|
aai.settings.api_key = settings.ASSEMBLYAI_API_KEY
|
|
|
|
audio_content = await audio_request.file.read()
|
|
temp_audio_path = None
|
|
|
|
# Validar el audio antes de continuar
|
|
validate_audio_request(audio_request, audio_content)
|
|
|
|
try:
|
|
# Crear archivo temporal para el audio
|
|
with tempfile.NamedTemporaryFile(
|
|
delete=False,
|
|
suffix=os.path.splitext(audio_request.file.filename)[1]
|
|
) as temp_audio:
|
|
temp_audio.write(audio_content)
|
|
temp_audio_path = temp_audio.name
|
|
|
|
#Definimos el modelo a usar
|
|
config = aai.TranscriptionConfig(speech_models = [audio_request.model],
|
|
language_code="es", speaker_labels=audio_request.diarization,
|
|
sentiment_analysis=audio_request.sentiment)
|
|
|
|
transcription_obj = aai.Transcriber(config=config).transcribe(temp_audio_path)
|
|
|
|
iterable = transcription_obj.sentiment_analysis if audio_request.sentiment else transcription_obj.utterances
|
|
|
|
|
|
if transcription_obj.status == aai.TranscriptStatus.error:
|
|
raise Exception(f"AssemblyAI Error: {transcription_obj.error}")
|
|
|
|
assemblyaiSegment = "utterances" if not audio_request.sentiment else "sentiment_analysis"
|
|
|
|
result = StandardTranscriptionResult(
|
|
status="success",
|
|
original_filename=audio_request.file.filename,
|
|
full_transcript=transcription_obj.text,
|
|
model_used=audio_request.model,
|
|
provider_used="AssemblyAI",
|
|
confidence_score=None,
|
|
segments=[
|
|
{
|
|
"text": segment.text,
|
|
"speaker": segment.speaker if audio_request.diarization else None,
|
|
"start_time": segment.start if audio_request.timestamps else None,
|
|
"end_time": segment.end if audio_request.timestamps else None,
|
|
"sentiment": segment.sentiment if audio_request.sentiment else None
|
|
}
|
|
for segment in iterable
|
|
] if (audio_request.diarization or audio_request.timestamps or audio_request.sentiment) else None
|
|
)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
# Capturamos cualquier error de OpenAI o de lectura de archivos
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error transcribiendo el audio: {str(e)}"
|
|
)
|
|
|
|
finally:
|
|
if temp_audio_path and os.path.exists(temp_audio_path):
|
|
try:
|
|
os.unlink(temp_audio_path)
|
|
except Exception:
|
|
pass
|
|
|
|
async def transcribe_with_deepgram(audio_request: AudioRequestFile):
|
|
"""
|
|
Función de adaptador para transcribir audio usando Deepgram.
|
|
"""
|
|
|
|
#Inicializamos el cliente de Deepgram
|
|
deepgram = DeepgramClient(api_key=settings.DEEPGRAM_API_KEY)
|
|
audio_content = await audio_request.file.read()
|
|
temp_audio_path = None
|
|
|
|
# Validar el audio antes de continuar
|
|
validate_audio_request(audio_request, audio_content)
|
|
|
|
try:
|
|
# Crear archivo temporal para el audio
|
|
with tempfile.NamedTemporaryFile(
|
|
delete=False,
|
|
suffix=os.path.splitext(audio_request.file.filename)[1]
|
|
) as temp_audio:
|
|
temp_audio.write(audio_content)
|
|
temp_audio_path = temp_audio.name
|
|
with open(temp_audio_path, "rb") as audio_file:
|
|
response = deepgram.listen.v1.media.transcribe_file(
|
|
request=audio_file.read(),
|
|
model=audio_request.model,
|
|
sentiment=audio_request.sentiment,
|
|
utterances=audio_request.diarization,
|
|
diarize=audio_request.diarization,
|
|
# Deepgram no tiene una opción específica de "timestamps", pero sí devuelve marcas de tiempo por segmento, así que no es necesario un parámetro adicional para eso
|
|
smart_format=True,
|
|
language='es',
|
|
)
|
|
response_json = json.loads(response.json())
|
|
|
|
result = StandardTranscriptionResult(
|
|
status="success",
|
|
original_filename=audio_request.file.filename,
|
|
full_transcript=response_json.get("results", {}).get("channels", [{}])[0].get("alternatives", [{}])[0].get("transcript", ""),
|
|
model_used=audio_request.model,
|
|
provider_used="Deepgram",
|
|
confidence_score=response_json.get("results", {}).get("channels", [{}])[0].get("alternatives", [{}])[0].get("confidence"),
|
|
segments=[
|
|
{
|
|
"text": sentence.get("text", ""),
|
|
"speaker": f"Speaker {paragraph.get('speaker')}" if audio_request.diarization and paragraph.get("speaker") is not None else None,
|
|
"start_time": sentence.get("start") if audio_request.timestamps else None,
|
|
"end_time": sentence.get("end") if audio_request.timestamps else None,
|
|
"sentiment": sentence.get("sentiment") if audio_request.sentiment else None
|
|
}
|
|
for paragraph in response_json.get("results", {}).get("channels", [{}])[0].get("alternatives", [{}])[0].get("paragraphs", {}).get("paragraphs", [])
|
|
for sentence in paragraph.get("sentences", [])
|
|
] if (audio_request.diarization or audio_request.timestamps or audio_request.sentiment) else None
|
|
)
|
|
return result
|
|
except Exception as e:
|
|
# Capturamos cualquier error de OpenAI o de lectura de archivos
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error transcribiendo el audio: {str(e)}"
|
|
)
|
|
|
|
finally:
|
|
if temp_audio_path and os.path.exists(temp_audio_path):
|
|
try:
|
|
os.unlink(temp_audio_path)
|
|
except Exception:
|
|
pass |