first commit

This commit is contained in:
suherdy.yacob@mapan.co.id 2025-05-30 08:20:24 +07:00
commit 1d722cf99b
37 changed files with 8612 additions and 0 deletions

13
.gitignore vendored Normal file
View File

@ -0,0 +1,13 @@
venv
build
dist
speechlib.egg-info
.env
*.swp
*.swo
# By default do not include these files for version control
# Override this by using 'git add -f'
*.wav
*.mp3

214
README.md Normal file
View File

@ -0,0 +1,214 @@
<p align="center">
<img src="speechlib.png" />
</p>
<p align="center">
<a href="./LICENSE"><img src="https://img.shields.io/github/license/Navodplayer1/speechlib"></a>
<a href="https://github.com/Navodplayer1/speechlib/releases"><img src="https://img.shields.io/github/v/release/Navodplayer1/speechlib?color=ffa"></a>
<a href="support os"><img src="https://img.shields.io/badge/os-linux%2C%20win%2C%20mac-pink.svg"></a>
<a href=""><img src="https://img.shields.io/badge/python-3.8+-aff.svg"></a>
<a href="https://github.com/Navodplayer1/speechlib/issues"><img src="https://img.shields.io/github/issues/Navodplayer1/speechlib?color=9cc"></a>
<a href="https://github.com/Navodplayer1/speechlib/stargazers"><img src="https://img.shields.io/github/stars/Navodplayer1/speechlib?color=ccf"></a>
<a href="https://pypi.org/project/speechlib/"><img src="https://static.pepy.tech/badge/speechlib"></a>
</p>
### Install torch torchaudio with CUDA support:
pip3 install torch torchaudio --index-url https://download.pytorch.org/whl/cu128
### Install torch torchaudio without CUDA support:
pip3 install torch torchaudio
### Run your IDE as administrator
you will get following error if administrator permission is not there:
**OSError: [WinError 1314] A required privilege is not held by the client**
### Requirements
* Python 3.8 or greater
### GPU execution
GPU execution needs CUDA 11.
GPU execution requires the following NVIDIA libraries to be installed:
* [cuBLAS for CUDA 11](https://developer.nvidia.com/cublas)
* [cuDNN 8 for CUDA 11](https://developer.nvidia.com/cudnn)
There are multiple ways to install these libraries. The recommended way is described in the official NVIDIA documentation, but we also suggest other installation methods below.
### Google Colab:
on google colab run this to install CUDA dependencies:
```
!apt install libcublas11
```
You can see this example [notebook](https://colab.research.google.com/drive/1lpoWrHl5443LSnTG3vJQfTcg9oFiCQSz?usp=sharing)
### installation:
```
pip install speechlib
```
This library does speaker diarization, speaker recognition, and transcription on a single wav file to provide a transcript with actual speaker names. This library will also return an array containing result information. ⚙
This library contains following audio preprocessing functions:
1. convert other audio formats to wav
2. convert stereo wav file to mono
3. re-encode the wav file to have 16-bit PCM encoding
Transcriptor method takes 7 arguments.
1. file to transcribe
2. log_folder to store transcription
3. language used for transcribing (language code is used)
4. model size ("tiny", "small", "medium", "large", "large-v1", "large-v2", "large-v3")
5. ACCESS_TOKEN: huggingface acccess token
1. Permission to access `pyannote/speaker-diarization@2.1` and `pyannote/segmentation`
2. Token requires permission for 'Read access to contents of all public gated repos you can access'
6. voices_folder (contains speaker voice samples for speaker recognition)
7. quantization: this determine whether to use int8 quantization or not. Quantization may speed up the process but lower the accuracy.
voices_folder should contain subfolders named with speaker names. Each subfolder belongs to a speaker and it can contain many voice samples. This will be used for speaker recognition to identify the speaker.
if voices_folder is not provided then speaker tags will be arbitrary.
log_folder is to store the final transcript as a text file.
transcript will also indicate the timeframe in seconds where each speaker speaks.
### Transcription example:
```
import os
from speechlib import Transcriptor
file = "obama_zach.wav" # your audio file
voices_folder = "" # voices folder containing voice samples for recognition
language = "en" # language code
log_folder = "logs" # log folder for storing transcripts
modelSize = "tiny" # size of model to be used [tiny, small, medium, large-v1, large-v2, large-v3]
quantization = False # setting this 'True' may speed up the process but lower the accuracy
ACCESS_TOKEN = "huggingface api key" # get permission to access pyannote/speaker-diarization@2.1 on huggingface
# quantization only works on faster-whisper
transcriptor = Transcriptor(file, log_folder, language, modelSize, ACCESS_TOKEN, voices_folder, quantization)
# use normal whisper
res = transcriptor.whisper()
# use faster-whisper (simply faster)
res = transcriptor.faster_whisper()
# use a custom trained whisper model
res = transcriptor.custom_whisper("D:/whisper_tiny_model/tiny.pt")
# use a huggingface whisper model
res = transcriptor.huggingface_model("Jingmiao/whisper-small-chinese_base")
# use assembly ai model
res = transcriptor.assemby_ai_model("assemblyAI api key")
res --> [["start", "end", "text", "speaker"], ["start", "end", "text", "speaker"]...]
```
#### if you don't want speaker names: keep voices_folder as an empty string ""
start: starting time of speech in seconds
end: ending time of speech in seconds
text: transcribed text for speech during start and end
speaker: speaker of the text
#### voices_folder structure:
![voices_folder_structure](voices_folder_structure1.png)
#### Transcription:
![transcription](transcript.png)
supported language codes:
```
"af", "am", "ar", "as", "az", "ba", "be", "bg", "bn", "bo", "br", "bs", "ca", "cs", "cy", "da", "de", "el", "en", "es", "et", "eu", "fa", "fi", "fo", "fr", "gl", "gu", "ha", "haw", "he", "hi", "hr", "ht", "hu", "hy", "id", "is","it", "ja", "jw", "ka", "kk", "km", "kn", "ko", "la", "lb", "ln", "lo", "lt", "lv", "mg", "mi", "mk", "ml", "mn","mr", "ms", "mt", "my", "ne", "nl", "nn", "no", "oc", "pa", "pl", "ps", "pt", "ro", "ru", "sa", "sd", "si", "sk","sl", "sn", "so", "sq", "sr", "su", "sv", "sw", "ta", "te", "tg", "th", "tk", "tl", "tr", "tt", "uk", "ur", "uz","vi", "yi", "yo", "zh", "yue"
```
supported language names:
```
"Afrikaans", "Amharic", "Arabic", "Assamese", "Azerbaijani", "Bashkir", "Belarusian", "Bulgarian", "Bengali","Tibetan", "Breton", "Bosnian", "Catalan", "Czech", "Welsh", "Danish", "German", "Greek", "English", "Spanish","Estonian", "Basque", "Persian", "Finnish", "Faroese", "French", "Galician", "Gujarati", "Hausa", "Hawaiian","Hebrew", "Hindi", "Croatian", "Haitian", "Hungarian", "Armenian", "Indonesian", "Icelandic", "Italian", "Japanese","Javanese", "Georgian", "Kazakh", "Khmer", "Kannada", "Korean", "Latin", "Luxembourgish", "Lingala", "Lao","Lithuanian", "Latvian", "Malagasy", "Maori", "Macedonian", "Malayalam", "Mongolian", "Marathi", "Malay", "Maltese","Burmese", "Nepali", "Dutch", "Norwegian Nynorsk", "Norwegian", "Occitan", "Punjabi", "Polish", "Pashto","Portuguese", "Romanian", "Russian", "Sanskrit", "Sindhi", "Sinhalese", "Slovak", "Slovenian", "Shona", "Somali","Albanian", "Serbian", "Sundanese", "Swedish", "Swahili", "Tamil", "Telugu", "Tajik", "Thai", "Turkmen", "Tagalog","Turkish", "Tatar", "Ukrainian", "Urdu", "Uzbek", "Vietnamese", "Yiddish", "Yoruba", "Chinese", "Cantonese",
```
### Audio preprocessing example:
```
from speechlib import PreProcessor
file = "obama1.mp3"
#initialize
prep = PreProcessor()
# convert mp3 to wav
wav_file = prep.convert_to_wav(file)
# convert wav file from stereo to mono
prep.convert_to_mono(wav_file)
# re-encode wav file to have 16-bit PCM encoding
prep.re_encode(wav_file)
```
### Performance
```
These metrics are from Google Colab tests.
These metrics do not take into account model download times.
These metrics are done without quantization enabled.
(quantization will make this even faster)
metrics for faster-whisper "tiny" model:
on gpu:
audio name: obama_zach.wav
duration: 6 min 36 s
diarization time: 24s
speaker recognition time: 10s
transcription time: 64s
metrics for faster-whisper "small" model:
on gpu:
audio name: obama_zach.wav
duration: 6 min 36 s
diarization time: 24s
speaker recognition time: 10s
transcription time: 95s
metrics for faster-whisper "medium" model:
on gpu:
audio name: obama_zach.wav
duration: 6 min 36 s
diarization time: 24s
speaker recognition time: 10s
transcription time: 193s
metrics for faster-whisper "large" model:
on gpu:
audio name: obama_zach.wav
duration: 6 min 36 s
diarization time: 24s
speaker recognition time: 10s
transcription time: 343s
```

11
config.ini Normal file
View File

@ -0,0 +1,11 @@
[FILE]
mode = cpu
modelpath = d:\faster-whisper-large-v3-turbo-ct2
audiofolder = \audios
outputfolder = D:\speechlib\output
outputfolder2 = D:\speechlib\output
outputfolder3 = D:\speechlib\output
voicefolder = D:\speechlib\voices
beamsize = 6
batchsize = 8
accesstoken = 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

14
examples/.gitignore vendored Normal file
View File

@ -0,0 +1,14 @@
example1.wav
temp
segments
pretrained_models
audio_cache
__pycache__
logs
greek_convo_short.mp3
greek_convo_short.wav
my_test.py
greek_convo.mp3
greek_convo.wav
.env
test.py

3
examples/README.md Normal file
View File

@ -0,0 +1,3 @@
##### Run transcribe.py for trancribing an audio file
##### Run preprocess.py for preprocessing an audio file

13
examples/preprocess.py Normal file
View File

@ -0,0 +1,13 @@
from speechlib import PreProcessor
file = "obama1.mp3"
#initialize
prep = PreProcessor()
# convert mp3 to wav
wav_file = prep.convert_to_wav(file)
# convert wav file from stereo to mono
prep.convert_to_mono(wav_file)
# re-encode wav file to have 16-bit PCM encoding
prep.re_encode(wav_file)

28
examples/transcribe.py Normal file
View File

@ -0,0 +1,28 @@
import os
from speechlib import Transcriptor
file = "obama_zach.wav" # your audio file
voices_folder = "" # voices folder containing voice samples for recognition
language = "en" # language code
log_folder = "logs" # log folder for storing transcripts
modelSize = "tiny" # size of model to be used [tiny, small, medium, large-v1, large-v2, large-v3]
quantization = False # setting this 'True' may speed up the process but lower the accuracy
ACCESS_TOKEN = "huggingface access token" # get permission to access pyannote/speaker-diarization@2.1 on huggingface
# quantization only works on faster-whisper
transcriptor = Transcriptor(file, log_folder, language, modelSize, ACCESS_TOKEN, voices_folder, quantization)
# use normal whisper
res = transcriptor.whisper()
# use faster-whisper (simply faster)
#res = transcriptor.faster_whisper()
# use a custom trained whisper model
#res = transcriptor.custom_whisper("D:/whisper_tiny_model/tiny.pt")
# use a huggingface whisper model
#res = transcriptor.huggingface_model("Jingmiao/whisper-small-chinese_base")
# use assembly ai model
#res = transcriptor.assemby_ai_model("assemblyAI api key")

198
library.md Normal file
View File

@ -0,0 +1,198 @@
### Run your IDE as administrator
you will get following error if administrator permission is not there:
**OSError: [WinError 1314] A required privilege is not held by the client**
### Requirements
* Python 3.8 or greater
### GPU execution
GPU execution needs CUDA 11.
GPU execution requires the following NVIDIA libraries to be installed:
* [cuBLAS for CUDA 11](https://developer.nvidia.com/cublas)
* [cuDNN 8 for CUDA 11](https://developer.nvidia.com/cudnn)
There are multiple ways to install these libraries. The recommended way is described in the official NVIDIA documentation, but we also suggest other installation methods below.
### Google Colab:
on google colab run this to install CUDA dependencies:
```
!apt install libcublas11
```
You can see this example [notebook](https://colab.research.google.com/drive/1lpoWrHl5443LSnTG3vJQfTcg9oFiCQSz?usp=sharing)
### installation:
```
pip install speechlib
```
This library does speaker diarization, speaker recognition, and transcription on a single wav file to provide a transcript with actual speaker names. This library will also return an array containing result information. ⚙
This library contains following audio preprocessing functions:
1. convert other audio formats to wav
2. convert stereo wav file to mono
3. re-encode the wav file to have 16-bit PCM encoding
Transcriptor method takes 7 arguments.
1. file to transcribe
2. log_folder to store transcription
3. language used for transcribing (language code is used)
4. model size ("tiny", "small", "medium", "large", "large-v1", "large-v2", "large-v3")
5. ACCESS_TOKEN: huggingface acccess token (also get permission to access `pyannote/speaker-diarization@2.1`)
6. voices_folder (contains speaker voice samples for speaker recognition)
7. quantization: this determine whether to use int8 quantization or not. Quantization may speed up the process but lower the accuracy.
voices_folder should contain subfolders named with speaker names. Each subfolder belongs to a speaker and it can contain many voice samples. This will be used for speaker recognition to identify the speaker.
if voices_folder is not provided then speaker tags will be arbitrary.
log_folder is to store the final transcript as a text file.
transcript will also indicate the timeframe in seconds where each speaker speaks.
### Transcription example:
```
import os
from speechlib import Transcriptor
file = "obama_zach.wav" # your audio file
voices_folder = "" # voices folder containing voice samples for recognition
language = "en" # language code
log_folder = "logs" # log folder for storing transcripts
modelSize = "tiny" # size of model to be used [tiny, small, medium, large-v1, large-v2, large-v3]
quantization = False # setting this 'True' may speed up the process but lower the accuracy
ACCESS_TOKEN = "huggingface api key" # get permission to access pyannote/speaker-diarization@2.1 on huggingface
# quantization only works on faster-whisper
transcriptor = Transcriptor(file, log_folder, language, modelSize, ACCESS_TOKEN, voices_folder, quantization)
# use normal whisper
res = transcriptor.whisper()
# use faster-whisper (simply faster)
res = transcriptor.faster_whisper()
# use a custom trained whisper model
res = transcriptor.custom_whisper("D:/whisper_tiny_model/tiny.pt")
# use a huggingface whisper model
res = transcriptor.huggingface_model("Jingmiao/whisper-small-chinese_base")
# use assembly ai model
res = transcriptor.assemby_ai_model("assemblyAI api key")
res --> [["start", "end", "text", "speaker"], ["start", "end", "text", "speaker"]...]
```
#### if you don't want speaker names: keep voices_folder as an empty string ""
start: starting time of speech in seconds
end: ending time of speech in seconds
text: transcribed text for speech during start and end
speaker: speaker of the text
#### voices folder structure:
```
voices_folder
|---> person1
| |---> sample1.wav
| |---> sample2.wav
| ...
|
|---> person2
| |---> sample1.wav
| |---> sample2.wav
| ...
|--> ...
```
supported language codes:
```
"af", "am", "ar", "as", "az", "ba", "be", "bg", "bn", "bo", "br", "bs", "ca", "cs", "cy", "da", "de", "el", "en", "es", "et", "eu", "fa", "fi", "fo", "fr", "gl", "gu", "ha", "haw", "he", "hi", "hr", "ht", "hu", "hy", "id", "is","it", "ja", "jw", "ka", "kk", "km", "kn", "ko", "la", "lb", "ln", "lo", "lt", "lv", "mg", "mi", "mk", "ml", "mn","mr", "ms", "mt", "my", "ne", "nl", "nn", "no", "oc", "pa", "pl", "ps", "pt", "ro", "ru", "sa", "sd", "si", "sk","sl", "sn", "so", "sq", "sr", "su", "sv", "sw", "ta", "te", "tg", "th", "tk", "tl", "tr", "tt", "uk", "ur", "uz","vi", "yi", "yo", "zh", "yue"
```
supported language names:
```
"Afrikaans", "Amharic", "Arabic", "Assamese", "Azerbaijani", "Bashkir", "Belarusian", "Bulgarian", "Bengali","Tibetan", "Breton", "Bosnian", "Catalan", "Czech", "Welsh", "Danish", "German", "Greek", "English", "Spanish","Estonian", "Basque", "Persian", "Finnish", "Faroese", "French", "Galician", "Gujarati", "Hausa", "Hawaiian","Hebrew", "Hindi", "Croatian", "Haitian", "Hungarian", "Armenian", "Indonesian", "Icelandic", "Italian", "Japanese","Javanese", "Georgian", "Kazakh", "Khmer", "Kannada", "Korean", "Latin", "Luxembourgish", "Lingala", "Lao","Lithuanian", "Latvian", "Malagasy", "Maori", "Macedonian", "Malayalam", "Mongolian", "Marathi", "Malay", "Maltese","Burmese", "Nepali", "Dutch", "Norwegian Nynorsk", "Norwegian", "Occitan", "Punjabi", "Polish", "Pashto","Portuguese", "Romanian", "Russian", "Sanskrit", "Sindhi", "Sinhalese", "Slovak", "Slovenian", "Shona", "Somali","Albanian", "Serbian", "Sundanese", "Swedish", "Swahili", "Tamil", "Telugu", "Tajik", "Thai", "Turkmen", "Tagalog","Turkish", "Tatar", "Ukrainian", "Urdu", "Uzbek", "Vietnamese", "Yiddish", "Yoruba", "Chinese", "Cantonese",
```
### Audio preprocessing example:
```
from speechlib import PreProcessor
file = "obama1.mp3"
#initialize
prep = PreProcessor()
# convert mp3 to wav
wav_file = prep.convert_to_wav(file)
# convert wav file from stereo to mono
prep.convert_to_mono(wav_file)
# re-encode wav file to have 16-bit PCM encoding
prep.re_encode(wav_file)
```
### Performance
```
These metrics are from Google Colab tests.
These metrics do not take into account model download times.
These metrics are done without quantization enabled.
(quantization will make this even faster)
metrics for faster-whisper "tiny" model:
on gpu:
audio name: obama_zach.wav
duration: 6 min 36 s
diarization time: 24s
speaker recognition time: 10s
transcription time: 64s
metrics for faster-whisper "small" model:
on gpu:
audio name: obama_zach.wav
duration: 6 min 36 s
diarization time: 24s
speaker recognition time: 10s
transcription time: 95s
metrics for faster-whisper "medium" model:
on gpu:
audio name: obama_zach.wav
duration: 6 min 36 s
diarization time: 24s
speaker recognition time: 10s
transcription time: 193s
metrics for faster-whisper "large" model:
on gpu:
audio name: obama_zach.wav
duration: 6 min 36 s
diarization time: 24s
speaker recognition time: 10s
transcription time: 343s
```

42
main.py Normal file
View File

@ -0,0 +1,42 @@
from speechlib import Transcriptor
from speechlib import PreProcessor
from configparser import ConfigParser
import os
config = ConfigParser()
config.read('config.ini')
mode = config.get('FILE', 'mode')
audiofolder = config.get('FILE', 'audiofolder')
access_token = config.get('FILE', 'accesstoken')
voicefolder = config.get('FILE', 'voicefolder')
language = "id"
quantization = False
modelSize = "medium"
### load the audio file in audio folder ###
current_dir = os.getcwd()
audio_dir = current_dir + audiofolder
output_dir = os.path.join(current_dir, "output")
if not os.path.exists(audio_dir):
os.makedirs(audio_dir)
print(f"Current directory: {current_dir}")
print(f"Audio directory: {audio_dir}")
print(f"Output directory: {output_dir}")
if not os.path.exists(output_dir):
os.makedirs(output_dir)
### Loop for each audio file in the audio folder ###
for filename in os.listdir(audio_dir):
if filename.endswith(".mp3") or filename.endswith(".wav"):
audiofile = os.path.join(audio_dir, filename)
print(f"Audio file: {audiofile}")
audiofilewithoutextension = audiofile.split(".mp3")[0].split(".wav")[0]
filenamewithoutextension = filename.split(".mp3")[0].split(".wav")[0]
filepath = os.path.join(output_dir, os.path.basename(audiofilewithoutextension).split('/')[-1]+".txt")
print(f"Output file: {filepath}")
filename = open(filepath, "w")
### transcribe ###
transcriptor = Transcriptor(audiofile, output_dir, language, modelSize, access_token, voicefolder, quantization)
res = transcriptor.faster_whisper()

View File

@ -0,0 +1,58 @@
# ############################################################################
# Model: ECAPA big for Speaker verification
# ############################################################################
# Feature parameters
n_mels: 80
# Pretrain folder (HuggingFace)
pretrained_path: speechbrain/spkrec-ecapa-voxceleb
# Output parameters
out_n_neurons: 7205
# Model params
compute_features: !new:speechbrain.lobes.features.Fbank
n_mels: !ref <n_mels>
mean_var_norm: !new:speechbrain.processing.features.InputNormalization
norm_type: sentence
std_norm: False
embedding_model: !new:speechbrain.lobes.models.ECAPA_TDNN.ECAPA_TDNN
input_size: !ref <n_mels>
channels: [1024, 1024, 1024, 1024, 3072]
kernel_sizes: [5, 3, 3, 3, 1]
dilations: [1, 2, 3, 4, 1]
attention_channels: 128
lin_neurons: 192
classifier: !new:speechbrain.lobes.models.ECAPA_TDNN.Classifier
input_size: 192
out_neurons: !ref <out_n_neurons>
mean_var_norm_emb: !new:speechbrain.processing.features.InputNormalization
norm_type: global
std_norm: False
modules:
compute_features: !ref <compute_features>
mean_var_norm: !ref <mean_var_norm>
embedding_model: !ref <embedding_model>
mean_var_norm_emb: !ref <mean_var_norm_emb>
classifier: !ref <classifier>
label_encoder: !new:speechbrain.dataio.encoder.CategoricalEncoder
pretrainer: !new:speechbrain.utils.parameter_transfer.Pretrainer
loadables:
embedding_model: !ref <embedding_model>
mean_var_norm_emb: !ref <mean_var_norm_emb>
classifier: !ref <classifier>
label_encoder: !ref <label_encoder>
paths:
embedding_model: !ref <pretrained_path>/embedding_model.ckpt
mean_var_norm_emb: !ref <pretrained_path>/mean_var_norm_emb.ckpt
classifier: !ref <pretrained_path>/classifier.ckpt
label_encoder: !ref <pretrained_path>/label_encoder.txt

File diff suppressed because it is too large Load Diff

7
requirements.txt Normal file
View File

@ -0,0 +1,7 @@
transformers
pydub
pyannote.audio
speechbrain
accelerate
faster-whisper
openai-whisper

4
speechlib/__init__.py Normal file
View File

@ -0,0 +1,4 @@
from .speechlib import(
Transcriptor,
PreProcessor
)

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,31 @@
import wave
import numpy as np
def convert_to_mono(input_wav):
# Open the input WAV file
with wave.open(input_wav, 'rb') as input_file:
# Get the parameters of the input file
params = input_file.getparams()
# Check if the file is stereo
if params.nchannels > 1:
# Read the audio data
frames = input_file.readframes(-1)
audio_data = np.frombuffer(frames, dtype=np.int16)
# Take the average of the channels to convert to mono
mono_audio_data = np.mean(audio_data.reshape(-1, params.nchannels), axis=1)
# Create a new WAV file for mono audio
with wave.open(input_wav, 'wb') as output_file:
# Set the parameters for the output file
output_file.setparams((1, params.sampwidth, params.framerate, len(mono_audio_data), params.comptype, params.compname))
# Write the mono audio data to the output file
output_file.writeframes(np.int16(mono_audio_data))
print(f'{input_wav} converted to mono')
else:
print(f'{input_wav} is already a mono audio file.')

View File

@ -0,0 +1,22 @@
from pydub import AudioSegment
import os
def convert_to_wav(input_file):
# Load the MP3 file using pydub
# Check if the file is already in WAV format
if input_file.lower().endswith(".wav"):
print(f"{input_file} is already in WAV format.")
return input_file
audio = AudioSegment.from_file(input_file)
# Create the output WAV file path
wav_path = os.path.splitext(input_file)[0] + ".wav"
# Export the audio to WAV
audio.export(wav_path, format="wav")
print(f"{input_file} has been converted to WAV format.")
return wav_path

139
speechlib/core_analysis.py Normal file
View File

@ -0,0 +1,139 @@
import os
from pyannote.audio import Pipeline
import time
from .wav_segmenter import (wav_file_segmentation)
import torch, torchaudio
from .speaker_recognition import (speaker_recognition)
from .write_log_file import (write_log_file)
from .re_encode import (re_encode)
from .convert_to_mono import (convert_to_mono)
from .convert_to_wav import (convert_to_wav)
# by default use google speech-to-text API
# if False, then use whisper finetuned version for sinhala
def core_analysis(file_name, voices_folder, log_folder, language, modelSize, ACCESS_TOKEN, model_type, quantization=False, custom_model_path=None, hf_model_id=None, aai_api_key=None):
# <-------------------PreProcessing file-------------------------->
# check if file is in wav format, if not convert to wav
file_name = convert_to_wav(file_name)
# convert file to mono
convert_to_mono(file_name)
# re-encode file to 16-bit PCM encoding
re_encode(file_name)
# <--------------------running analysis--------------------------->
speaker_tags = []
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization@2.1",
use_auth_token=ACCESS_TOKEN)
if torch.cuda.is_available():
device = torch.device("cuda")
elif torch.backends.mps.is_available():
device = torch.device("mps")
os.environ['PYTORCH_ENABLE_MPS_FALLBACK'] = '1'
else:
device = torch.device("cpu")
pipeline.to(device)
waveform, sample_rate = torchaudio.load(file_name)
start_time = int(time.time())
print("running diarization...")
diarization = pipeline({"waveform": waveform, "sample_rate": sample_rate}, min_speakers=0, max_speakers=10)
end_time = int(time.time())
elapsed_time = int(end_time - start_time)
print(f"diarization done. Time taken: {elapsed_time} seconds.")
speakers = {}
common = []
# create a dictionary of SPEAKER_XX to real name mappings
speaker_map = {}
for turn, _, speaker in diarization.itertracks(yield_label=True):
start = round(turn.start, 1)
end = round(turn.end, 1)
common.append([start, end, speaker])
# find different speakers
if speaker not in speaker_tags:
speaker_tags.append(speaker)
speaker_map[speaker] = speaker
speakers[speaker] = []
speakers[speaker].append([start, end, speaker])
if voices_folder != None and voices_folder != "":
identified = []
start_time = int(time.time())
print("running speaker recognition...")
for spk_tag, spk_segments in speakers.items():
spk_name = speaker_recognition(file_name, voices_folder, spk_segments, identified)
spk = spk_name
identified.append(spk)
speaker_map[spk_tag] = spk
end_time = int(time.time())
elapsed_time = int(end_time - start_time)
print(f"speaker recognition done. Time taken: {elapsed_time} seconds.")
keys_to_remove = []
merged = []
# merging same speakers
for spk_tag1, spk_segments1 in speakers.items():
for spk_tag2, spk_segments2 in speakers.items():
if spk_tag1 not in merged and spk_tag2 not in merged and spk_tag1 != spk_tag2 and speaker_map[spk_tag1] == speaker_map[spk_tag2]:
for segment in spk_segments2:
speakers[spk_tag1].append(segment)
merged.append(spk_tag1)
merged.append(spk_tag2)
keys_to_remove.append(spk_tag2)
# fixing the speaker names in common
for segment in common:
speaker = segment[2]
segment[2] = speaker_map[speaker]
for key in keys_to_remove:
del speakers[key]
del speaker_map[key]
# transcribing the texts differently according to speaker
start_time = int(time.time())
print("running transcription...")
for spk_tag, spk_segments in speakers.items():
spk = speaker_map[spk_tag]
segment_out = wav_file_segmentation(file_name, spk_segments, language, modelSize, model_type, quantization, custom_model_path, hf_model_id, aai_api_key)
speakers[spk_tag] = segment_out
end_time = int(time.time())
elapsed_time = int(end_time - start_time)
print(f"transcription done. Time taken: {elapsed_time} seconds.")
common_segments = []
for item in common:
speaker = item[2]
start = item[0]
end = item[1]
for spk_tag, spk_segments in speakers.items():
if speaker == speaker_map[spk_tag]:
for segment in spk_segments:
if start == segment[0] and end == segment[1]:
common_segments.append([start, end, segment[2], speaker])
# writing log file
write_log_file(common_segments, log_folder, file_name, language)
return common_segments

37
speechlib/re_encode.py Normal file
View File

@ -0,0 +1,37 @@
import wave
import struct
def re_encode(file_name):
with wave.open(file_name, 'rb') as original_file:
# Get the original audio parameters
params = original_file.getparams()
# Check if the sample width is already 16-bit
if params.sampwidth == 2:
print("The file already has 16-bit samples.")
elif params.sampwidth == 1:
# Open a new WAV file with 16-bit samples
file_name = file_name + '_16bit.wav'
with wave.open(file_name, 'wb') as new_file:
# Set the new audio parameters
new_file.setparams(params)
new_file.setsampwidth(2)
new_file.setnchannels(1)
# Read and convert each sample
for _ in range(params.nframes):
sample = original_file.readframes(1)
sample_value = struct.unpack("<B", sample)[0]
new_sample_value = (sample_value - 128) * 256
new_sample = struct.pack("<h", new_sample_value)
new_file.writeframes(new_sample)
print("Conversion completed. Saved as " + file_name)
else:
print("Unsupported sample width.")

View File

@ -0,0 +1,82 @@
from speechbrain.inference import SpeakerRecognition
import os
from pydub import AudioSegment
from collections import defaultdict
import torch
if torch.cuda.is_available():
verification = SpeakerRecognition.from_hparams(run_opts={"device":"cuda"}, source="speechbrain/spkrec-ecapa-voxceleb", savedir="pretrained_models/spkrec-ecapa-voxceleb")
else:
verification = SpeakerRecognition.from_hparams(run_opts={"device":"cpu"}, source="speechbrain/spkrec-ecapa-voxceleb", savedir="pretrained_models/spkrec-ecapa-voxceleb")
# recognize speaker name
def speaker_recognition(file_name, voices_folder, segments, wildcards):
speakers = os.listdir(voices_folder)
Id_count = defaultdict(int)
# Load the WAV file
audio = AudioSegment.from_file(file_name, format="wav")
folder_name = "temp"
if not os.path.exists(folder_name):
os.makedirs(folder_name)
i = 0
'''
iterate over segments and check speaker for increased accuracy.
assign speaker name to arbitrary speaker tag 'SPEAKER_XX'
'''
limit = 60
duration = 0
for segment in segments:
start = segment[0] * 1000 # start time in miliseconds
end = segment[1] * 1000 # end time in miliseconds
clip = audio[start:end]
i = i + 1
file = folder_name + "/" + file_name.split("/")[-1].split(".")[0] + "_segment"+ str(i) + ".wav"
clip.export(file, format="wav")
max_score = 0
person = "unknown" # if no match to any voice, then return unknown
for speaker in speakers:
voices = os.listdir(voices_folder + "/" + speaker)
for voice in voices:
voice_file = voices_folder + "/" + speaker + "/" + voice
try:
# compare voice file with audio file
score, prediction = verification.verify_files(voice_file, file)
prediction = prediction[0].item()
score = score[0].item()
if prediction == True:
if score >= max_score:
max_score = score
speakerId = speaker.split(".")[0]
if speakerId not in wildcards: # speaker_00 cannot be speaker_01
person = speakerId
except Exception as err:
print("error occured while speaker recognition: ", err)
Id_count[person] += 1
# Delete the WAV file after processing
os.remove(file)
current_pred = max(Id_count, key=Id_count.get)
duration += (end - start)
if duration >= limit and current_pred != "unknown":
break
most_common_Id = max(Id_count, key=Id_count.get)
return most_common_Id

283
speechlib/speechlib.py Normal file
View File

@ -0,0 +1,283 @@
from .core_analysis import (core_analysis)
from .re_encode import (re_encode)
from .convert_to_mono import (convert_to_mono)
from .convert_to_wav import (convert_to_wav)
class Transcriptor:
def __init__(self, file, log_folder, language, modelSize, ACCESS_TOKEN, voices_folder=None, quantization=False):
'''
transcribe a wav file
arguments:
file: name of wav file with extension ex: file.wav
log_folder: name of folder where transcript will be stored
language: language of wav file
modelSize: tiny, small, medium, large, large-v1, large-v2, large-v3 (bigger model is more accurate but slow!!)
ACCESS_TOKEN: huggingface access token
voices_folder: folder containing subfolders named after each speaker with speaker voice samples in them. This will be used for speaker recognition
quantization: whether to use int8 quantization or not (default=False)
see documentation: https://github.com/Navodplayer1/speechlib
supported languages:
#### Afrikaans
"af",
#### Amharic
"am",
#### Arabic
"ar",
#### Assamese
"as",
#### Azerbaijani
"az",
#### Bashkir
"ba",
#### Belarusian
"be",
#### Bulgarian
"bg",
#### Bengali
"bn",
#### Tibetan
"bo",
#### Breton
"br",
#### Bosnian
"bs",
#### Catalan
"ca",
#### Czech
"cs",
#### Welsh
"cy",
#### Danish
"da",
#### German
"de",
#### Greek
"el",
#### English
"en",
#### Spanish
"es",
#### Estonian
"et",
#### Basque
"eu",
#### Persian
"fa",
#### Finnish
"fi",
#### Faroese
"fo",
#### French
"fr",
#### Galician
"gl",
#### Gujarati
"gu",
#### Hausa
"ha",
#### Hawaiian
"haw",
#### Hebrew
"he",
#### Hindi
"hi",
#### Croatian
"hr",
#### Haitian
"ht",
#### Hungarian
"hu",
#### Armenian
"hy",
#### Indonesian
"id",
#### Icelandic
"is",
#### Italian
"it",
#### Japanese
"ja",
#### Javanese
"jw",
#### Georgian
"ka",
#### Kazakh
"kk",
#### Khmer
"km",
#### Kannada
"kn",
#### Korean
"ko",
#### Latin
"la",
#### Luxembourgish
"lb",
#### Lingala
"ln",
#### Lao
"lo",
#### Lithuanian
"lt",
#### Latvian
"lv",
#### Malagasy
"mg",
#### Maori
"mi",
#### Macedonian
"mk",
#### Malayalam
"ml",
#### Mongolian
"mn",
#### Marathi
"mr",
#### Malay
"ms",
#### Maltese
"mt",
#### Burmese
"my",
#### Nepali
"ne",
#### Dutch
"nl",
#### Norwegian Nynorsk
"nn",
#### Norwegian
"no",
#### Occitan
"oc",
#### Punjabi
"pa",
#### Polish
"pl",
#### Pashto
"ps",
#### Portuguese
"pt",
#### Romanian
"ro",
#### Russian
"ru",
#### Sanskrit
"sa",
#### Sindhi
"sd",
#### Sinhalese
"si",
#### Slovak
"sk",
#### Slovenian
"sl",
#### Shona
"sn",
#### Somali
"so",
#### Albanian
"sq",
#### Serbian
"sr",
#### Sundanese
"su",
#### Swedish
"sv",
#### Swahili
"sw",
#### Tamil
"ta",
#### Telugu
"te",
#### Tajik
"tg",
#### Thai
"th",
#### Turkmen
"tk",
#### Tagalog
"tl",
#### Turkish
"tr",
#### Tatar
"tt",
#### Ukrainian
"uk",
#### Urdu
"ur",
#### Uzbek
"uz",
#### Vietnamese
"vi",
#### Yiddish
"yi",
#### Yoruba
"yo",
#### Chinese
"zh",
#### Cantonese
"yue",
'''
self.file = file
self.voices_folder = voices_folder
self.language = language
self.log_folder = log_folder
self.modelSize = modelSize
self.quantization = quantization
self.ACCESS_TOKEN = ACCESS_TOKEN
def whisper(self):
res = core_analysis(self.file, self.voices_folder, self.log_folder, self.language, self.modelSize, self.ACCESS_TOKEN, "whisper", self.quantization)
return res
def faster_whisper(self):
res = core_analysis(self.file, self.voices_folder, self.log_folder, self.language, self.modelSize, self.ACCESS_TOKEN, "faster-whisper", self.quantization)
return res
def custom_whisper(self, custom_model_path):
res = core_analysis(self.file, self.voices_folder, self.log_folder, self.language, self.modelSize, self.ACCESS_TOKEN, "custom", self.quantization, custom_model_path)
return res
def huggingface_model(self, hf_model_id):
res = core_analysis(self.file, self.voices_folder, self.log_folder, self.language, self.modelSize, self.ACCESS_TOKEN, "huggingface", self.quantization, None, hf_model_id)
return res
def assemby_ai_model(self, aai_api_key):
res = core_analysis(self.file, self.voices_folder, self.log_folder, self.language, self.modelSize, self.ACCESS_TOKEN, "assemblyAI", self.quantization, None, None, aai_api_key)
return res
class PreProcessor:
'''
class for preprocessing audio files.
methods:
re_encode(file) -> re-encode file to 16-bit PCM encoding
convert_to_mono(file) -> convert file from stereo to mono
mp3_to_wav(file) -> convert mp3 file to wav format
'''
def re_encode(self, file):
re_encode(file)
def convert_to_mono(self, file):
convert_to_mono(file)
def convert_to_wav(self, file):
path = convert_to_wav(file)
return path

122
speechlib/transcribe.py Normal file
View File

@ -0,0 +1,122 @@
import torch
from .whisper_sinhala import (whisper_sinhala)
from faster_whisper import WhisperModel, BatchedInferencePipeline
import whisper
import os
from transformers import pipeline
from configparser import ConfigParser
import assemblyai as aai
from configparser import ConfigParser
config = ConfigParser()
config.read('config.ini')
modelpath = config.get('FILE', 'modelpath')
batchsize = int(config.get('FILE', 'batchsize'))
beamsize = int(config.get('FILE', 'beamsize'))
localfile = False
def transcribe(file, language, model_size, model_type, quantization, custom_model_path, hf_model_path, aai_api_key):
res = ""
if language in ["si", "Si"]:
res = whisper_sinhala(file)
return res
elif model_size in ["base", "tiny", "small", "medium", "large", "large-v1", "large-v2", "large-v3"]:
if model_type == "faster-whisper":
if modelpath != "":
model_size = modelpath
localfile = True
if torch.cuda.is_available():
if quantization:
model = WhisperModel(model_size, device="cuda", compute_type="int8_float16", local_files_only=localfile)
else:
model = WhisperModel(model_size, device="cuda", compute_type="float16", local_files_only=localfile)
else:
if quantization:
model = WhisperModel(model_size, device="cpu", compute_type="int8", local_files_only=localfile)
else:
model = WhisperModel(model_size, device="cpu", compute_type="float32", local_files_only=localfile)
batched_model = BatchedInferencePipeline(model=model)
if language in model.supported_languages:
segments, info = batched_model.transcribe(file, language=language, beam_size=beamsize, batch_size=batchsize, task="transcribe", vad_filter=True)
for segment in segments:
res += segment.text + " "
return res
else:
Exception("Language code not supported.\nThese are the supported languages:\n", model.supported_languages)
elif model_type == "whisper":
try:
if torch.cuda.is_available():
model = whisper.load_model(model_size, device="cuda")
result = model.transcribe(file, language=language, fp16=True)
res = result["text"]
else:
model = whisper.load_model(model_size, device="cpu")
result = model.transcribe(file, language=language, fp16=False)
res = result["text"]
return res
except Exception as err:
print("an error occured while transcribing: ", err)
elif model_type == "custom":
custom_model_path = modelpath
model_folder = os.path.dirname(custom_model_path)
model_folder = model_folder + "/"
print("model file: ", custom_model_path)
print("model fodler: ", model_folder)
try:
if torch.cuda.is_available():
model = whisper.load_model(custom_model_path, download_root=model_folder, device="cuda")
result = model.transcribe(file, language=language, fp16=True)
res = result["text"]
else:
model = whisper.load_model(custom_model_path, download_root=model_folder, device="cpu")
result = model.transcribe(file, language=language, fp16=False)
res = result["text"]
return res
except Exception as err:
raise Exception(f"an error occured while transcribing: {err}")
elif model_type == "huggingface":
try:
if torch.cuda.is_available():
pipe = pipeline("automatic-speech-recognition", model=hf_model_path, device="cuda")
result = pipe(file)
res = result['text']
else:
pipe = pipeline("automatic-speech-recognition", model=hf_model_path, device="cpu")
result = pipe(file)
res = result['text']
return res
except Exception as err:
raise Exception(f"an error occured while transcribing: {err}")
elif model_type == "assemblyAI":
try:
# Replace with your API key
aai.settings.api_key = aai_api_key
# You can set additional parameters for the transcription
config = aai.TranscriptionConfig(
speech_model=aai.SpeechModel.nano,
language_code=language
)
transcriber = aai.Transcriber(config=config)
transcript = transcriber.transcribe(file)
if transcript.status == aai.TranscriptStatus.error:
print(transcript.error)
raise Exception(f"an error occured while transcribing: {transcript.error}")
else:
res = transcript.text
return res
except Exception as err:
raise Exception(f"an error occured while transcribing: {err}")
else:
raise Exception(f"model_type {model_type} is not supported")
else:
raise Exception("only 'base', 'tiny', 'small', 'medium', 'large', 'large-v1', 'large-v2', 'large-v3' models are available.")

View File

@ -0,0 +1,43 @@
import os
from pydub import AudioSegment
from .transcribe import (transcribe)
# segment according to speaker
def wav_file_segmentation(file_name, segments, language, modelSize, model_type, quantization, custom_model_path, hf_model_path, aai_api_key):
# Load the WAV file
audio = AudioSegment.from_file(file_name, format="wav")
trans = ""
texts = []
folder_name = "segments"
if not os.path.exists(folder_name):
os.makedirs(folder_name)
i = 0
for segment in segments:
start = segment[0] * 1000 # start time in miliseconds
end = segment[1] * 1000 # end time in miliseconds
clip = audio[start:end]
i = i + 1
file = folder_name + "/" + f"segment-{file_name}"+ str(i) + ".wav"
clip.export(file, format="wav")
try:
trans = transcribe(file, language, modelSize, model_type, quantization, custom_model_path, hf_model_path, aai_api_key)
# return -> [[start time, end time, transcript], [start time, end time, transcript], ..]
texts.append([segment[0], segment[1], trans])
except Exception as err:
print("ERROR while transcribing: ", err)
# Delete the WAV file after processing
try:
os.remove(file)
except OSError as e:
print (f'Access-error on file {str(e)}')
return texts

View File

@ -0,0 +1,8 @@
from transformers import pipeline
def whisper_sinhala(file):
pipe = pipeline("automatic-speech-recognition", model="Ransaka/whisper-tiny-sinhala-20k-8k-steps-v2")
res = pipe(file)
return res["text"]

View File

@ -0,0 +1,33 @@
import os
from datetime import datetime
def write_log_file(common_segments, log_folder, file_name, language):
if not os.path.exists(log_folder):
os.makedirs(log_folder)
#---------------------log file part-------------------------
current_time = datetime.now().strftime('%H%M%S')
file_name = os.path.splitext(os.path.basename(file_name))[0]
log_file = log_folder + "/" + file_name + "_" + current_time + "_" + language + ".txt"
lf=open(log_file,"wb")
entry = ""
for segment in common_segments:
start = segment[0]
end = segment[1]
text = segment[2]
speaker = segment[3]
if text != "" and text != None:
entry += f"{speaker} ({start} : {end}) : {text}\n"
lf.write(bytes(entry.encode('utf-8')))
lf.close()
# -------------------------log file end-------------------------