티스토리 뷰

DEV

pyannote 테스트

SBP 2025. 10. 21. 10:52

import os
import torch
import whisper
from pyannote.audio import Pipeline
from pyannote.audio.pipelines.utils.hook import ProgressHook
from pydub import AudioSegment


# 화자 분리 및 파일 저장 가능
# 전사 및 화자 분리 후 단어 단위 결과를 문장 단위로 재구성

#------------------------------------------------------------------
# 준비 단계
#------------------------------------------------------------------
print("--- 모델 로드 시작 ---")
try:
    # pyannote.audio 사전 학습된 화자 분리 모델 로드
    pipeline = Pipeline.from_pretrained(
    "pyannote/speaker-diarization-community-1",
    token="hf_token")
    pipeline.to(torch.device("cuda"))
   
    whisper_model = whisper.load_model("medium")
    whisper_model.to(torch.device("cuda"))
    print("--- 모델 로드 완료 ---")
except Exception as e:
    print(f"모델 로드 중 오류 발생: {e}")
    exit()

# 대상 오디오 파일
audio_file = "./input_audio/PEAKER_01.wav"


#------------------------------------------------------------------
# 1. 음성 전사 실행 (전체 오디오)
#------------------------------------------------------------------
print("--- 음성 전사 실행 중 ---")
# word_timestamps=True로 설정하여 단어 단위 시간 정보를 가져옵니다.
transcription_result = whisper_model.transcribe(audio_file, word_timestamps=True)
print("--- 음성 전사 완료 ---")
print("전사 결과(transcription_result):")
# print(transcription_result) # 출력량이 많으므로 주석 처리
print("-" * 30)

if not transcription_result['segments']:
    print("전사 결과가 비어 있습니다. (오디오 파일에 음성이 없거나 음질이 매우 낮음)")
    print("프로그램을 종료합니다.")
    exit()

#------------------------------------------------------------------
# 2. 화자 분리 실행
#------------------------------------------------------------------
print("--- 화자 분리 실행 중 ---")
with ProgressHook() as hook:
    diarization_output = pipeline(audio_file, hook=hook) # runs locally
print("--- 화자 분리 완료 ---")

#------------------------------------------------------------------
# 3. 화자 분리 결과와 단어별 전사 결과 병합
#------------------------------------------------------------------
print("--- 화자 분리 결과와 전사 결과 병합 시작 ---")
# 단어들을 저장할 리스트 초기화
word_level_results = []
diarization_turns = list(diarization_output.speaker_diarization)

# 전사된 단어를 순회하며 가장 적절한 화자 구간에 할당
for segment in transcription_result['segments']:
    if 'words' not in segment:
        continue # 단어 정보가 없는 경우 스킵

    for word_info in segment['words']:
        word_start = word_info['start']
        word_end = word_info['end']
        word_text = word_info['word'].strip()

        if not word_text:
            continue # 공백인 경우 스킵

        # 단어가 속하는 화자 구간 찾기
        assigned_speaker = None
        for turn, speaker in diarization_turns:
            if turn.start <= word_start < turn.end:
                assigned_speaker = speaker
                break
       
        # 할당된 화자가 있다면 리스트에 추가
        if assigned_speaker:
            word_level_results.append({
                "speaker": assigned_speaker,
                "text": word_text,
                "start": word_start,
                "end": word_end
            })

print("--- 화자 분리 결과와 전사 결과 병합 완료 ---")

#------------------------------------------------------------------
# 4. 단어를 문장 단위로 재구성 (수정)
#------------------------------------------------------------------
print("--- 단어를 문장 단위로 재구성 시작 ---")
sentence_results = []
current_sentence = None
silence_threshold = 0.2  # 문장 끝을 판단하는 침묵 시간(초)
max_sentence_length = 28.0 # 최대 문장 길이 제한 (5초)

for i, word in enumerate(word_level_results):
    if not current_sentence:
        current_sentence = {
            "speaker": word["speaker"],
            "start": word["start"],
            "end": word["end"],
            "text": word["text"]
        }
    else:
        # 문장 종료 조건: 화자 변경, 긴 침묵, 또는 문장 길이 초과
        is_speaker_change = word["speaker"] != current_sentence["speaker"]
        is_long_pause = (word["start"] - current_sentence["end"]) > silence_threshold
        is_sentence_too_long = (word["end"] - current_sentence["start"]) > max_sentence_length
       
        if is_speaker_change or is_long_pause or is_sentence_too_long:
            # 현재 문장 저장 및 초기화
            current_sentence["text"] = current_sentence["text"].strip().replace('.', '').replace('?', '').replace('!', '')
            sentence_results.append(current_sentence)

            # 새로운 문장 시작
            current_sentence = {
                "speaker": word["speaker"],
                "start": word["start"],
                "end": word["end"],
                "text": word["text"]
            }
        else:
            # 문장 이어가기
            current_sentence["text"] += " " + word["text"]
            current_sentence["end"] = word["end"]
           
# 마지막 문장 저장
if current_sentence:
    current_sentence["text"] = current_sentence["text"].strip().replace('.', '').replace('?', '').replace('!', '')
    sentence_results.append(current_sentence)

print("--- 단어를 문장 단위로 재구성 완료 ---")


#------------------------------------------------------------------
# 5. 화자별 음성데이터와 텍스트 파일 저장
#------------------------------------------------------------------
print("--- 화자별 음성데이터 저장 및 단일 텍스트 파일 생성 시작 ---")
audio = AudioSegment.from_file(audio_file, format="wav")

# 침묵 시간 설정
SILENCE_DURATION_MS = 200

# pydub은 기본적으로 2 (16비트) sample_width를 사용하여 침묵을 생성합니다.
silence = AudioSegment.silent(
    duration=SILENCE_DURATION_MS,
    frame_rate=audio.frame_rate # frame_rate는 전달해야 병합 시 속성 일치 가능성이 높아집니다.
)

os.makedirs("output_audio", exist_ok=True)
# output_texts 폴더 대신 최종 텍스트 파일 경로를 설정합니다.
MASTER_TEXT_FILE = os.path.join("output_audio", "manifest.txt") # output_audio 폴더에 저장

# 모든 기록을 저장할 리스트 초기화
all_transcriptions = []
for i, res in enumerate(sentence_results):
    speaker_id = res['speaker']
    start_time = res['start']
    end_time = res['end']
    text = res['text']
   
    # a초에서 b초 사이의 데이터만 저장합니다.
    if 1 <= (end_time - start_time) <= 29:
        if text.strip():
            # 1. 오디오 파일에서 순수 음성 구간만 추출
            segment = audio[start_time * 1000 : end_time * 1000]
           
            # 2. 앞뒤에 침묵 세그먼트를 결합
            final_segment = silence + segment + silence
           
            # 파일 접두사 (침묵 추가 정보 포함)
            # 💡 WAV 파일 확장자까지 포함하여 파일명으로 사용합니다.
            audio_filename_only = f"{speaker_id}_{int(start_time)}_{int(end_time)}.wav"
            audio_path = os.path.join("output_audio", audio_filename_only)
           
            # 오디오 파일 저장
            # 최종 세그먼트 (침묵 + 음성 + 침묵) 저장
            final_segment.export(audio_path, format="wav")
           
            # 3. 마스터 텍스트 파일에 추가할 기록을 생성
            # 양식: 음성 파일명|0|내용
            transcription_line = f"{audio_filename_only}|0|{text.strip()}"
            all_transcriptions.append(transcription_line)
           
            print(f"저장 완료 (순수 침묵 추가됨): {audio_path}")
        else:
            print(f"건너뜀(전사 없음): 구간 길이: {end_time - start_time:.2f}s")
    else:
        print(f"건너뜀(길이 불충분): 구간 길이: {end_time - start_time:.2f}s, '{text}'")

# 모든 반복이 끝난 후, 단 하나의 마스터 텍스트 파일에 모든 기록을 저장합니다.
with open(MASTER_TEXT_FILE, "w", encoding="utf-8") as f:
    f.write("\n".join(all_transcriptions))

print("--- 모든 파일이 성공적으로 저장되었습니다 ---")

 

'DEV' 카테고리의 다른 글

python m4a to wav 파일 변환  (0) 2025.10.21
AI 화자 분리 및 전사 스크립트 분석  (1) 2025.10.16
Windows 11 WSL2 및 VS Code 설정 가이드  (0) 2025.10.16
PyAnnote.Audio 개발 환경 구축 절차  (0) 2025.10.16
CRON 표기법  (1) 2025.09.11
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2026/04   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30
글 보관함