Туториал по распознаванию речи

Подготовка

В данном туториале будем запускать примеры из репозитория с примерами.

Для авторизации в сервисах получим ключи здесь.
Дальше выставляем эти ключи в переменные среды:

1
2
export VOICEKIT_API_KEY="PUT_YOUR_API_KEY_HERE"
export VOICEKIT_SECRET_KEY="PUT_YOUR_SECRET_KEY_HERE"

Клонируем репозиторий с примерами:

1
git clone --recursive https://github.com/TinkoffCreditSystems/voicekit-examples.git

Устанавливаем зависимости:

1
2
sudo apt-get install python3 python-pyaudio python3-pyaudio
sudo python3 -m pip install -r requirements/all.txt

Методы совершения запросов

Для распознавания речи в сервисе реализовано 3 метода:

Метод "Recognize" работает по принципу "загружаем аудио целиком, получаем ответ", обычно полезен только для распознавания аудиофайлов (поэтому здесь будет только один пример).

Метод LongRunningRecognize работает по принципу "отправляем аудио целиком, а результат - когда он будет готов - забираем из отдельного интерфейса".

Остальные примеры туториала посвящены методу "StreamingRecognize".
Этот метод нужен для распознавания речи в реальном времени: телефонных звонков, голосовых ассистентов и т.п.
Для распознавания файлов в методе "StreamingRecognize" так же больше возможностей.

Метод Recognize

Пример 1: метод Recognize() для LINEAR16
1
$ ./stt_recognize_linear16_raw.py

Самый простой режим запрос-ответ: загружаем аудио целиком, получаем ответ.

В этом примере так же загружаем сэмплы из "сырого" формата

.s16
(формат не содержит метаинформации в отличие от wav).
В нашем случае работаем с одноканальным аудио с частотой дискретизации 16 KHz.

Импортируем модули:

1
2
3
4
from tinkoff.cloud.stt.v1 import stt_pb2_grpc, stt_pb2 # сообщения и стабы gRPC API
from auth import authorization_metadata # для авторизации по JWT
import grpc
import os

Получаем конфигурацию:

1
2
3
4
# можно получать из переменных среды, либо заменить в сниппете
endpoint = os.environ.get("VOICEKIT_ENDPOINT") or "stt.tinkoff.ru:443"
api_key = os.environ["VOICEKIT_API_KEY"]
secret_key = os.environ["VOICEKIT_SECRET_KEY"]

Реализуем создание запроса:

1
2
3
4
5
6
7
8
def build_request():
    request = stt_pb2.RecognizeRequest()
    with open("../audio/sample_3.s16", "rb") as f:
        request.audio.content = f.read()
    request.config.encoding = stt_pb2.AudioEncoding.LINEAR16
    request.config.sample_rate_hertz = 16000 # Значение не содержится в файле ".s16"
    request.config.num_channels = 1 # Значение не содержится в файле ".s16"
    return request

Реализуем печать ответа:

1
2
3
4
5
6
7
8
def print_recognition_response(response):
    for result in response.results:
        print("Channel", result.channel)
        print("Phrase start:", result.start_time.ToTimedelta())
        print("Phrase end:  ", result.end_time.ToTimedelta())
        for alternative in result.alternatives:
            print('"' + alternative.transcript + '"')
        print("----------------------------")

Производим запрос:

1
2
3
4
stub = stt_pb2_grpc.SpeechToTextStub(grpc.secure_channel(endpoint, grpc.ssl_channel_credentials()))
metadata = authorization_metadata(api_key, secret_key, "tinkoff.cloud.stt")
response = stub.Recognize(build_request(), metadata=metadata)
print_recognition_response(response)

Исходный код примера

Метод Streaming Recognize

Кодеки и форматы файлов

Пример 2: переходим на метод StreamingRecognize()
1
$ ./stt_streaming_recognize_linear16_raw.py

В режиме StreamingRecognize() мы можем слать аудио по частям.
Этот режим нас интересует для распознавания аудио в реальном времени.

Первым запросом должно идти сообщение "streaming_config", а все последующие - "audio_content".

Реализуем создание первого запроса:

1
2
3
4
5
6
def build_first_request():
    request = stt_pb2.StreamingRecognizeRequest()
    request.streaming_config.config.encoding = stt_pb2.AudioEncoding.LINEAR16
    request.streaming_config.config.sample_rate_hertz = 16000
    request.streaming_config.config.num_channels = 1
    return request

Реализуем генератор запросов:

1
2
3
4
5
6
7
8
9
10
11
def generate_requests():
    try:
        yield build_first_request()
        with open("../audio/sample_3.s16", "rb") as f:
            for data in iter(lambda:f.read(3200), b''): # Шлём по 100 миллисекунд (2 байта на сэмпл)
                request = stt_pb2.StreamingRecognizeRequest()
                request.audio_content = data
                yield request
    except Exception as e: # обрабатываем исключение внутри генератора
        print("Got exception in generate_requests", e)
        raise

Реализуем печать ответов:

1
2
3
4
5
6
7
8
9
def print_streaming_recognition_responses(responses):
    for response in responses:
        for result in response.results:
            print("Channel", result.recognition_result.channel)
            print("Phrase start:", result.recognition_result.start_time.ToTimedelta())
            print("Phrase end:  ", result.recognition_result.end_time.ToTimedelta())
            for alternative in result.recognition_result.alternatives:
                print('"' + alternative.transcript + '"')
            print("------------------")

Производим запрос:

1
2
3
4
stub = stt_pb2_grpc.SpeechToTextStub(grpc.secure_channel(endpoint, grpc.ssl_channel_credentials()))
metadata = authorization_metadata(api_key, secret_key, "tinkoff.cloud.stt")
responses = stub.StreamingRecognize(generate_requests(), metadata=metadata)
print_streaming_recognition_responses(responses)

Исходный код примера

Пример 3: загружаем аудио из ".wav" файла
1
$ ./stt_streaming_recognize_linear16_wav.py

Попробуем загрузить аудио из контейнера RIFF WAVE.
Пример работает с WAV файлом с аудио в формате PCM.

Импортируем модуль

wave
:

1
import wave

Пробрасываем конфигурацию аудио в первый запрос:

1
2
3
4
5
6
def build_first_request(sample_rate_hertz, num_channels):
    request = stt_pb2.StreamingRecognizeRequest()
    request.streaming_config.config.encoding = stt_pb2.AudioEncoding.LINEAR16
    request.streaming_config.config.sample_rate_hertz = sample_rate_hertz
    request.streaming_config.config.num_channels = num_channels
    return request

Теперь читаем сэмплы вместо байтов:

1
2
3
4
        with wave.open("../audio/sample_3.wav") as f:
            yield build_first_request(f.getframerate(), f.getnchannels())
            frame_samples = f.getframerate()//10 # Шлём по 100 миллисекунд
            for data in iter(lambda:f.readframes(frame_samples), b''):

Исходный код примера

Пример 4: используем кодек A-Law
1
$ ./stt_streaming_recognize_alaw_raw.py

Поработаем с A-Law - кодеком с квази-экспоненциальным кодированием амплитуды аудиоимпульса.
Это позволяет кодировать аудио по 1 байту за сэмпл с сохранением приемлемого качества в отличие от линейного кодирования.
Можно считать его очень производительным (с точки зрения нагрузки на CPU) форматом сжатия с потерями, т. к. сжатие сводится к независимому преобразованию сэмплов.

В основном применяется в телефонии. В нашем примере, как и в типичной конфигурации VoIP, используется частота в 8 KHz.

(Мы так же поддерживаем формат Mu-Law, но, поскольку он является устаревшим аналогом A-Law с менее эффективным распределением кодируемых значений амплитуды аудиоимпульса, его использование оправдано только поддержкой совместимости.)

Логика похожа на Пример 2

Задаём кодек и частоту дискретизации:

1
2
    request.streaming_config.config.encoding = stt_pb2.AudioEncoding.ALAW
    request.streaming_config.config.sample_rate_hertz = 8000

Адаптируем размер фрэйма в байтах:

1
            for data in iter(lambda:f.read(800), b''): # Шлём по 100 миллисекунд (1 байт на сэмпл)

Исходный код примера

Пример 5: распознаём MP3
1
$ ./stt_streaming_recognize_mp3.py

MP3 поддерживается в формате контейнера с фиксироанными количеством каналов и частотой дискретизации.

Меняем Пример 3

Импортируем модуль для чтения метаинформации из MP3:

1
from mutagen.mp3 import MP3

Задаём формат кодирования:

1
    request.streaming_config.config.encoding = stt_pb2.AudioEncoding.MPEG_AUDIO

Заполняем метаинформацию и отправляем аудио (шлём по 4096 байт за раз, но в реальности значение нужно выбирать исходя из условий использования):

1
2
3
4
5
        fname = "../audio/sample_3.mp3"
        info = MP3(fname).info
        yield build_first_request(info.sample_rate, info.channels)
        with open(fname, "rb") as f:
            for data in iter(lambda:f.read(4096), b''): # Шлём по 4096 байт за раз

Исходный код примера

Пример 6: кодируем в Opus на лету
1
$ ./stt_streaming_recognize_raw_opus_from_wav.py

В этом примере читаем ".wav" файл и кодируем в сырые Opus пакеты.

На практике сжатие на лету удобно применять при работе с аудио разговорами в реальном времени, при распознавании с микрофона и т. п.
На сегодняшний день Opus - наиболее эффективный кодек сжатия с потерями: по своим характеристикам он обходит как кодеки для разговоров (с небольшой задержкой декодирования), так и кодеки для хранения аудиотрэков (сжатые большими блоками для большего сжатия).

Задаём формат кодирования:

1
    request.streaming_config.config.encoding = stt_pb2.AudioEncoding.RAW_OPUS

В режиме RAW_OPUS важно отправлять ровно по одному Opus фрэйму в запрос, т. к. сами фрэймы не содержат информации о длине фрэйма.
Например, если сконкатенировать два фрэйма в один, невозможно будет понять, где заканчивается первый фрэйм, и где начинается второй.
Для хранения Opus фрэймов в файле упаковкой занимаются форматы контейнеров, например, Ogg (не путать с аудиокодеком Vorbis).

Реализуем проверку частоты дискретизации на допустимое значение:

1
2
def frame_rate_is_valid(frame_rate):
    return frame_rate in [8000, 12000, 16000, 24000, 48000]

Реализуем вычисление оптимального размера выровненного фрэйма для последнего фрагмента аудио:

1
2
3
4
5
6
def get_padded_frame_size(frame_samples, frame_rate):
    for duration_half_msec in [5, 10, 20, 40, 80, 120]: # Допустимые размеры фрэймов: 2.5, 5, 10, 20, 40 или 60 мсек.
        padded_samples = frame_rate//2000*duration_half_msec
        if frame_samples <= padded_samples:
            return padded_samples
    raise("Unexpected frame samples")

Инициализируем состояние энкодера в Opus и кодируем фрэймами фиксированного размера (кроме последнего):

1
2
3
4
5
6
7
8
9
10
11
        with wave.open("../audio/sample_3.wav") as f:
            frame_rate_is_valid(f.getframerate())
            yield build_first_request(f.getframerate(), f.getnchannels())
            frame_samples = f.getframerate()//1000*60 # 60 мсек.
            opus_encoder = opuslib.Encoder(f.getframerate(), f.getnchannels(), opuslib.APPLICATION_AUDIO)
            for data in iter(lambda:f.readframes(frame_samples), b''): # Отправляем 60 мсек. за раз
                if len(data) < frame_samples*2: # Расширяем фрэйм до ближайшего допустимого размера
                    data = data.ljust(get_padded_frame_size(frame_samples, f.getframerate())*2, b'\0')
                request = stt_pb2.StreamingRecognizeRequest()
                request.audio_content = opus_encoder.encode(data, len(data) >> 1)
                yield request

Исходный код примера

Ввод с микрофона

Пример 7: распознаём с микрофона
1
$ ./stt_streaming_recognize_linear16_from_microphone.py

Для удобства тестирования сервиса можно делать запросы непосредственно с микрофона.

Импортируем модуль:

1
import pyaudio

Инициализируем запись с микрофона и отправляем запросы:

1
2
3
4
5
6
7
8
        sample_rate_hertz, num_channels = 16000, 1
        pyaudio_lib = pyaudio.PyAudio()
        f = pyaudio_lib.open(input=True, channels=num_channels, format=pyaudio.paInt16, rate=sample_rate_hertz)
        yield build_first_request(sample_rate_hertz, num_channels)
        for data in iter(lambda:f.read(800), b''): # Шлём по 50ms за раз
            request = stt_pb2.StreamingRecognizeRequest()
            request.audio_content = data
            yield request

Исходный код примера

Режимы и опции распознавания

Пример 8: получаем несколько гипотез
1
$ ./stt_streaming_recognize_max_alternatives.py

Сервис может генерировать несколько предположений о том, что хотел сказать автор.
Попробуем их получить.

Задаём количество желаемых гипотез:

1
    request.streaming_config.config.max_alternatives = 3

Адаптируем вывод результата:

1
2
3
4
5
            print("Alternatives: [")
            for alternative in result.recognition_result.alternatives:
                print('    "' + alternative.transcript + '",')
            print("]")
            print("------------------")

Исходный код примера

Пример 9: получаем промежуточные гипотезы
1
$ ./stt_streaming_recognize_interim_results.py

Т. к., сервис распознаёт текст на лету, можно получать промежуточные версии на лету.
Чаще всего это применяется при надиктовывании текста в чат.

Включаем промежуточные гипотезы:

1
    request.streaming_config.interim_results_config.enable_interim_results = True

Подсуетимся о красивом выравнивании таймштампов:

1
2
3
4
5
def format_time_stamp(timestamp):
    return "{:>02d}:{:>02d}:{:>02d}.{:>03d}".format(timestamp.seconds//(60*60), (timestamp.seconds//60)%60, timestamp.seconds%60, timestamp.microseconds//1000)

def time_range(recognition_result):
    return "[" + format_time_stamp(recognition_result.start_time.ToTimedelta()) + " .. " + format_time_stamp(recognition_result.end_time.ToTimedelta()) + "]"

Теперь нам нужно различать финальные и промежуточные гипотезы - эту информацию мы возьмём из флага

is_final
.
Кроме того, важно понимать, к какой фразе относится промежуточная гипотеза - изобразим это наглядно:

1
2
3
4
5
6
7
8
9
10
            if not inside_phrase:
                print("[Phrase begin]")
                inside_phrase = True
            assert(len(result.recognition_result.alternatives) == 1) # Handle carefully at real service
            if result.is_final:
                print("Final result:   " + time_range(result.recognition_result) + " \"" + result.recognition_result.alternatives[0].transcript + "\"")
                print("[Phrase end]")
                inside_phrase = False
            else:
                print("Interim result: " + time_range(result.recognition_result) + " \"" + result.recognition_result.alternatives[0].transcript + "\"")

Исходный код примера

Пример 10: кастомизируем VAD (Voice Activity Detection)
1
$ ./stt_streaming_recognize_vad_customization.py

Иногда возникает необходимость переопределить настройки определения завершения фразы.
Например, для изменения отзывчивости телефонного робота-опросника.

В реальном поведении также участвуют характеристики модели и внутрее устройство сервиса.

В данном примере повышением

silence_duration_threshold
добьёмся того, что 2 фразы сольются в одну.
Заодно потюним и другие настройки:

1
2
3
4
    request.streaming_config.config.vad_config.min_speech_duration = 1.0
    request.streaming_config.config.vad_config.max_speech_duration = 30.0
    request.streaming_config.config.vad_config.silence_duration_threshold = 3.0
    request.streaming_config.config.vad_config.silence_prob_threshold = 0.2

Исходный код примера

Пример 11: режим завершения после первой фразы
1
$ ./stt_streaming_recognize_single_utterance.py

Этот режим типичен в ситуациях, когда нет желания анализировать диалог целиком - хочется упростить логику сервиса.
Например, в голосовом ассистенте: чтобы не следить за тем, что пользователь наговорил между запросами, а новый запрос начать распознавать с чистого листа.

Выставим соответствующий флаг:

1
    request.streaming_config.single_utterance = True

Исходный код примера

Пример 12: выключаем VAD
1
$ ./stt_streaming_recognize_vad_disabled.py

Если выключить VAD, мы распознаем аудиофрагмент как одну фразу.

Выставим соответствующий флаг:

1
    request.streaming_config.config.do_not_perform_vad = True

Исходный код примера

Пример 13: включаем пунктуацию
1
$ ./stt_streaming_recognize_enable_automatic_punctuation.py

Воспользуемся автоматической пунктуацией сервиса.

Выставим соответствующий флаг:

1
    request.streaming_config.config.enable_automatic_punctuation = True

Исходный код примера

Метод LongRunningRecognize

Пример 14: Загрузка аудио для отложенной обработки и получение результата через циклический опрос
1
$ ../stt_long_running_recognize_single_audio.py

Метод LongRunningRecognize принимает на вход параметры аналогичные Recognize, однако возвращает не результат распознавания, а идентификатор операции, за ходом обработки которой можно следить.
Такой режим нас может интересовать для распознавания, результаты которого не требуются немедленно.

Генерация запроса аналогична генерации RecognizeRequest за исключением названия метода.

1
2
3
4
5
6
7
8
def build_recognize_request():
    request = stt_pb2.LongRunningRecognizeRequest()
    with open("../../audio/sample_3.s16", "rb") as f:
        request.audio.content = f.read()
    request.config.encoding = stt_pb2.AudioEncoding.LINEAR16
    request.config.sample_rate_hertz = 16000  # Not stored at raw ".s16" file
    request.config.num_channels = 1  # Not stored at raw ".s16" file
    return request

Метод LongRunningRecognize возвращает объект операции - это объект, содержащий информацию о текущем статусе обработки задачи. Функция ниже демонстрирует как можно получить разную информацию о текущем состоянии операции.

1
2
3
4
5
6
7
8
9
10
def print_longrunning_operation(operation):
    print("Operation id:", operation.id)
    print("State:", OperationState.Name(operation.state))
    if operation.state == DONE:
        response = stt_pb2.RecognizeResponse()
        operation.response.Unpack(response)
        print_recognition_response(response)
    if operation.state == FAILED:
        print("Error:", operation.error)
    print("============================")

Для получения информации по статусам операций используется отдельный сервис - Operations. Следует обратить внимание, что в метаданных следует указывать поле AUD, соответствующее этому сервису:

1
2
operations_stub = longrunning_pb2_grpc.OperationsStub(grpc.secure_channel(endpoint, grpc.ssl_channel_credentials()))
operations_metadata = authorization_metadata(api_key, secret_key, "tinkoff.cloud.longrunning")

Наивный способ дождаться выполнения операции - это опрос статуса отдельной в цикле.
Этот подход не является оптимальным, но удобен для того, чтобы посмотреть как система работает. Сгенерируем запрос для этого.

1
2
3
4
def build_get_operation_request(id):
    request = longrunning_pb2.GetOperationRequest()
    request.id = id
    return request

Осуществляем запрос

1
2
3
4
while operation.state != FAILED and operation.state != DONE:
    time.sleep(1)
    operation = operations_stub.GetOperation(build_get_operation_request(operation.id), metadata=operations_metadata)
    print_longrunning_operation(operation)

Исходный код примера

Пример 15: Загрузка пачки аудио для отложенной обработки и получение результата через нотификации
1
$ ./stt_long_running_recognize_audio_group.py

Вместо опроса API в цикле и запоминания идентификатора каждой операции можно упаковать их в группы и получать результаты обработки через метод WatchOperations.

Для того, чтобы пачки аудио, залитые при повторных запусках примера, не путались друг с другом, поместим их в группы. В качестве имени группы можно использовать произвольную строку. Для примера используем текущую дату.

1
group_name = datetime.now().strftime("test-group-%Y-%m-%d, %H:%M:%S")

При формировании запроса на распознавание нужно указать группу, в которую операция должна быть помещена.

1
2
3
4
5
6
7
8
9
10
def build_recognize_request(file_path, group_name):
    request = stt_pb2.LongRunningRecognizeRequest()
    # Note: setting the group name here allows us filtering by group name in WatchOperations
    request.group = group_name
    with open(file_path, "rb") as f:
        request.audio.content = f.read()
    request.config.encoding = stt_pb2.AudioEncoding.LINEAR16
    request.config.sample_rate_hertz = 48000  # Not stored at raw ".s16" file
    request.config.num_channels = 1  # Not stored at raw ".s16" file
    return request

В качестве пачки аудио будем использовать все файлы, содержащиеся в каталоге.

1
2
3
4
5
6
7
8
9
10
audio_folder = "../../audio/sample_group"

# Send audio files for recognition
stt_stub = stt_pb2_grpc.SpeechToTextStub(grpc.secure_channel(endpoint, grpc.ssl_channel_credentials()))
stt_metadata = authorization_metadata(api_key, secret_key, "tinkoff.cloud.stt")
created_operations = 0
for test_file in os.listdir(audio_folder):
    file_path = join(audio_folder, test_file)
    stt_stub.LongRunningRecognize(build_recognize_request(file_path, group_name), metadata=stt_metadata)
    created_operations += 1

Реализуем вспомогательные методы для вывода состояния операций в консоль.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def print_longrunning_operations(operations):
    for operation in operations:
        print(f"[{operation.id}] {get_recognition_state_description(operation)}")
    print("============================")


def get_recognition_state_description(operation):
    if operation.state == DONE:
        response = stt_pb2.RecognizeResponse()
        operation.response.Unpack(response)

        return " ".join([result.alternatives[0].transcript for result in response.results])
    if operation.state == FAILED:
        return operation.error

    return OperationState.Name(operation.state)

Подготовим запрос для вызова WatchOperations. Он содержит информацию о том, какие операции должны возвращаться, а также признак того, нужно ли после отправки текущего состояния операций переходить в режим нотификаций об изменениях.

1
2
3
4
5
6
7
def build_watch_operations_request(group_name):
    request = longrunning_pb2.WatchOperationsRequest()
    request.filter.exact_group = group_name
    # Note: listen_for_updates is set to False by default.
    # Setting it to True here is required for update notifications to be sent.
    request.listen_for_updates = True
    return request

Вызываем WatchOperations, который позволяет осуществлять фильтрацию операций, входящих в определённую группу.
Ниже продемонстрирован способ ожидания окончания обработки операций и обработка статусов.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def count_finished_operations(operations):
    return sum([int(operation.state == DONE) for operation in operations])

print(f"Watching operations in group '{group_name}'")
responses = operations_stub.WatchOperations(build_watch_operations_request(group_name), metadata=operations_metadata)
finished_operations = 0
for response in responses:
    if response.HasField("initial_state"):
        finished_operations += count_finished_operations(response.initial_state.operations)
        print("WatchOperations. Initial state:")
        print_longrunning_operations(response.initial_state.operations)
    elif response.HasField("init_finished"):
        print("WatchOperations. Init finished.")
    else:
        assert response.HasField("update")
        finished_operations += count_finished_operations(response.update.operations)
        print("WatchOperations. Update:")
        print_longrunning_operations(response.update.operations)

    if finished_operations == created_operations:
        break
print("Done.")

Исходный код примера