forked from ericorps/ia-microservice
163 lines
6.3 KiB
Python
163 lines
6.3 KiB
Python
"""
|
|
Gateway de IA de Qualidot - Módulo de Adaptadores de Transcripción
|
|
|
|
Propósito:
|
|
Este módulo contiene funciones de adaptadores que permiten transcribir audio usando diferentes proveedores de IA.
|
|
Cada función de adaptador se encarga de interactuar con un proveedor específico (como OpenAI, AssemblyAI, Deepgram, etc.)
|
|
y de convertir la respuesta del proveedor al formato estándar de transcripción de Qualidot.
|
|
|
|
"""
|
|
|
|
import tempfile
|
|
import os
|
|
from fastapi import HTTPException
|
|
from openai import OpenAI, AsyncOpenAI
|
|
import assemblyai as aai
|
|
from app.core.config import settings
|
|
from app.schemas.audio_standard import AudioRequestFile
|
|
from app.schemas.audio_standard import StandardTranscriptionResult
|
|
from app.utilities.audio_utilities import validate_audio_file, validate_audio_size, validate_audio_request
|
|
from app.core import config
|
|
|
|
# Función de adaptador principal que infiere el proveedor y llama al adaptador específico
|
|
async def transcribe_audio_with_provider(audio_request: AudioRequestFile) -> StandardTranscriptionResult:
|
|
"""
|
|
Función de adaptador para transcribir audio usando el proveedor de IA configurado.
|
|
"""
|
|
provider = audio_request.provider.lower()
|
|
|
|
match provider:
|
|
case "openai":
|
|
return await transcribe_with_openai(audio_request)
|
|
case "assemblyai":
|
|
return await transcribe_with_assemblyai(audio_request)
|
|
case _:
|
|
raise ValueError(f"Proveedor de IA no soportado: {audio_request.provider}")
|
|
|
|
# Función de adaptador para transcribir audio usando OpenAI
|
|
async def transcribe_with_openai(audio_request: AudioRequestFile) -> StandardTranscriptionResult:
|
|
"""
|
|
Función de adaptador para transcribir audio usando OpenAI.
|
|
"""
|
|
client = AsyncOpenAI(api_key=settings.OPENAI_API_KEY)
|
|
|
|
audio_content = await audio_request.file.read()
|
|
|
|
temp_audio_path = None # Inicializamos la variable fuera del try
|
|
|
|
# Validar el audio antes de continuar
|
|
validate_audio_request(audio_request, audio_content)
|
|
|
|
try:
|
|
# Crear archivo temporal para el audio
|
|
with tempfile.NamedTemporaryFile(
|
|
delete=False,
|
|
suffix=os.path.splitext(audio_request.file.filename)[1]
|
|
) as temp_audio:
|
|
temp_audio.write(audio_content)
|
|
temp_audio_path = temp_audio.name
|
|
|
|
with open(temp_audio_path, "rb") as audio_file_obj:
|
|
transcription = await client.audio.transcriptions.create(
|
|
model=audio_request.model,
|
|
file=audio_file_obj,
|
|
response_format="text"
|
|
)
|
|
|
|
result = StandardTranscriptionResult(
|
|
status="success",
|
|
original_filename=audio_request.file.filename,
|
|
full_transcript=transcription,
|
|
model_used=audio_request.model,
|
|
provider_used="OpenAI",
|
|
confidence_score=None
|
|
)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
# Capturamos cualquier error de OpenAI o de lectura de archivos
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error transcribiendo el audio: {str(e)}"
|
|
)
|
|
|
|
finally:
|
|
if temp_audio_path and os.path.exists(temp_audio_path):
|
|
try:
|
|
os.unlink(temp_audio_path)
|
|
except Exception:
|
|
pass
|
|
|
|
# Función de adaptador para transcribir audio usando AssemblyAI
|
|
async def transcribe_with_assemblyai(audio_request: AudioRequestFile) -> StandardTranscriptionResult:
|
|
"""
|
|
Función de adaptador para transcribir audio usando AssemblyAI.
|
|
"""
|
|
aai.settings.api_key = settings.ASSEMBLYAI_API_KEY
|
|
|
|
audio_content = await audio_request.file.read()
|
|
temp_audio_path = None
|
|
|
|
# Validar el audio antes de continuar
|
|
validate_audio_request(audio_request, audio_content)
|
|
|
|
try:
|
|
# Crear archivo temporal para el audio
|
|
with tempfile.NamedTemporaryFile(
|
|
delete=False,
|
|
suffix=os.path.splitext(audio_request.file.filename)[1]
|
|
) as temp_audio:
|
|
temp_audio.write(audio_content)
|
|
temp_audio_path = temp_audio.name
|
|
|
|
#Definimos el modelo a usar
|
|
config = aai.TranscriptionConfig(language_code="es", speaker_labels=audio_request.diarization,
|
|
sentiment_analysis=audio_request.sentiment)
|
|
|
|
transcription_obj = aai.Transcriber(config=config).transcribe(temp_audio_path)
|
|
|
|
iterable = transcription_obj.sentiment_analysis if audio_request.sentiment else transcription_obj.utterances
|
|
|
|
|
|
if transcription_obj.status == aai.TranscriptStatus.error:
|
|
raise Exception(f"AssemblyAI Error: {transcription_obj.error}")
|
|
|
|
assemblyaiSegment = "utterances" if not audio_request.sentiment else "sentiment_analysis"
|
|
|
|
result = StandardTranscriptionResult(
|
|
status="success",
|
|
original_filename=audio_request.file.filename,
|
|
full_transcript=transcription_obj.text,
|
|
model_used=audio_request.model,
|
|
provider_used="AssemblyAI",
|
|
confidence_score=None,
|
|
segments=[
|
|
{
|
|
"text": segment.text,
|
|
"speaker": segment.speaker if audio_request.diarization else None,
|
|
"start_time": segment.start if audio_request.timestamps else None,
|
|
"end_time": segment.end if audio_request.timestamps else None,
|
|
"sentiment": segment.sentiment if audio_request.sentiment else None
|
|
}
|
|
for segment in iterable
|
|
] if (audio_request.diarization or audio_request.timestamps or audio_request.sentiment) else None
|
|
)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
# Capturamos cualquier error de OpenAI o de lectura de archivos
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error transcribiendo el audio: {str(e)}"
|
|
)
|
|
|
|
finally:
|
|
if temp_audio_path and os.path.exists(temp_audio_path):
|
|
try:
|
|
os.unlink(temp_audio_path)
|
|
except Exception:
|
|
pass
|
|
# Aquí iría la implementación específica para AssemblyAI, similar a la de OpenAI pero usando su SDK y formato de respuesta
|
|
pass |