Skip to main content
Version: 3.4.0

Speaker Search

We are going to do a speaker search. This means we want to find known speaker(s) inside a large archive of speakers, and essentially answer the question: where are these known speakers speaking? Let's say we already have an audio recording with the known voice of John Doe. We also have an archive of speakers and we would like to discover which recordings from the archive likely contain John Doe's voice.

The process of speaker search consists of voiceprint extraction and N to M voiceprint comparison, where M is the size of the speaker archive. Both voiceprint extraction and comparison are explained in the following sections. The full Python code for this example is provided at the end of this guide. For simplicity, our N will be 1 in this example, but you can use an analogous approach to search for multiple known speakers in the speaker archive. Both extraction and comparison are explained in the following sections.

Attached you will find a ZIP file recordings.zip containing a mono-channel audio file of John Doe (john_doe.wav) speaking and a speaker archive consisting of 8 mono-channel recordings (from unknown_01.wav to unknown_08.wav), therefore in this case, M = 8.

Environment setup

We are using Python 3.9 and the Python library requests 2.27 in this example. You can install requests with pip as follows:

pip install requests~=2.27

Then, you can import the following libraries (time is built-in):

import time
import requests

Voiceprint extraction

In order to run voiceprint extraction for a single audio file, you should start by sending a POST request to /api/technology/speaker-identification-voiceprint-extraction as follows:

with open("john_doe.wav", mode="rb") as file:
files = {"file": file}
# Replace <speech-platform-server> with your actual server URL
response = requests.post(
"https://<speech-platform-server>/api/technology/speaker-identification-voiceprint-extraction",
files=files,
)
response.raise_for_status()

If the task was successfully accepted, a 202 code will be returned together with a unique task ID in the response body. The task isn't immediately processed, but only scheduled for processing. You can check the current task status while polling for the result.

The URL for polling for the result is returned in the X-Location header. Alternatively, you can assemble the polling URL on your own by appending a slash (/) and task ID to the initial voiceprint extraction URL.

polling_url = response.headers["x-location"]

counter = 0
while counter < 100:
response = requests.get(polling_url)
response.raise_for_status()
data = response.json()
task_status = data["task"]["state"]
if task_status in ["done", "failed", "rejected"]:
break
counter += 1
time.sleep(5)

Once the polling finishes, data will contain the latest response from the server -- either a response with the extracted voiceprint(s), or an error message with details in case processing was not able to finish. The response contains one voiceprint for each channel. Example result of a successful voiceprint extraction from a stereo file (voiceprints were shortened for readability):

{
"task": {"task_id": "fb9de4e5-a768-4069-aff3-c74c826f3ddf", "state": "done"},
"result": {
"channels": [
{
"channel_number": 0,
"voiceprint": "e2kDY3JjbDAWiyhpCWVtYmVkZGluZ1tkO/QWvmS8JkuGZDyv+F5kvJQzJ...",
"speech_length": 49.08,
"model": "sid-xl5",
},
{
"channel_number": 1,
"voiceprint": "e2kDY3JjbFL6NSxpCWVtYmVkZGluZ1tkO2ygcGS8CduAZDxa6ZBkPHrYf...",
"speech_length": 116.35,
"model": "sid-xl5",
},
]
},
}

Let's get back to our example with the mono audio files. In our case, the target voiceprint can be accessed as follows:

known_audio_voiceprint = data["result"]["channels"][0]["voiceprint"]

Great, you've extracted your first voiceprint and assigned it to a variable, congratulations! Now, you can repeat the same process for the entire archive and collect the extracted voiceprints in a list, so it's ready for the comparison. Let's name the list unknown_audios_voiceprints. The easiest way to do so is to repeat the same steps for each unknown audio file in a for loop and append each extracted voiceprint to the unknown_audios_voiceprints list. Note that this unknown_audios_voiceprints list should have the length of M. Please refer to the full Python code to see how it's done.

Voiceprint comparison

The voiceprints are extracted, so we can move on to the actual comparison. Two non-empty lists of voiceprints are expected as input for comparison. Each voiceprint is expected to be a Base64-encoded string, but you don't have to worry about it — the voiceprints are already returned in this format from the voiceprint extraction.

Doing the voiceprint comparison is analogous to voiceprint extraction -- we start by requesting the voiceprint comparison task to be scheduled for our two voiceprint lists. Notice that known_audio_voiceprint is just a string, so it must be placed inside a one-element list in the request body. On the other hand, unknown_audios_voiceprints already is a list, and therefore can be used directly.

body = {
"voiceprints_a": [known_audio_voiceprint],
"voiceprints_b": unknown_audios_voiceprints,
}
response = requests.post(
"https://<speech-platform-server>/api/technology/speaker-identification-voiceprint-comparison",
json=body,
)
response.raise_for_status()

Now you can poll for the task result as it was done above for the voiceprint extraction. The result is a comparison matrix with a shape based on the sizes of both input voiceprint lists. In the case of our current speaker search, the resulting matrix has the shape of 1xM, where M is the size of the speaker archive. After you finish polling, the result of the comparison can be found in data and should look like this:

{
"task": {"task_id": "4178f672-20b4-4f79-b7eb-a871bbae4456", "state": "done"},
"result": {
"scores": {
"rows_count": 1,
"columns_count": 8,
"values": [
-4.726645469665527,
9.340583801269531,
6.426426887512207,
-5.342464447021484,
4.384160041809082,
7.261765480041504,
-5.660372257232666,
4.433615684509277,
],
}
},
}

Let's map the scores to their corresponding audio files for better readability:

audio_files_with_scores = zip(
unknown_audios, voiceprint_comparison_response["result"]["scores"]["values"]
)
for audio_file, score in audio_files_with_scores:
print(f"{audio_file}\t{score}")

It should result in the following output:

unknown_01.wav	-4.726645469665527
unknown_02.wav 9.340583801269531
unknown_03.wav 6.426426887512207
unknown_04.wav -5.342464447021484
unknown_05.wav 4.384160041809082
unknown_06.wav 7.261765480041504
unknown_07.wav -5.660372257232666
unknown_08.wav -4.433615684509277

This result shows that John Doe is very likely speaking in the following files: unknown_02.wav, unknown_03.wav, unknown_05.wav, unknown_06.wav, but not in the rest of the speaker archive.

For more details on how scoring is handled, refer to the Scoring and conversion to percentage section in our speaker identification guide.

Full Python code

Here is the full code for this example, slightly adjusted and wrapped into functions for better readability:

import time

import requests

SPEECH_PLATFORM_SERVER = "<speech-platform-server>" # Replace with your actual server URL

def poll_result(polling_url: str, sleep: int = 5):
while True:
response = requests.get(polling_url)
response.raise_for_status()
data = response.json()
task_status = data["task"]["state"]
if task_status in ["done", "failed", "rejected"]:
break
time.sleep(sleep)
return response


def do_voiceprint_extraction(audio_path: str):
with open(audio_path, mode="rb") as file:
files = {"file": file}
response = requests.post(
f"https://{SPEECH_PLATFORM_SERVER}/api/technology/speaker-identification-voiceprint-extraction",
files=files,
)
response.raise_for_status()
polling_url = response.headers["x-location"]
voiceprint_extraction_response = poll_result(polling_url)
return voiceprint_extraction_response.json()


def do_voiceprint_comparison(voiceprints_a: list, voiceprints_b: list):
response = requests.post(
f"https://{SPEECH_PLATFORM_SERVER}/api/technology/speaker-identification-voiceprint-comparison",
json={"voiceprints_a": voiceprints_a, "voiceprints_b": voiceprints_b},
)
response.raise_for_status()
polling_url = response.headers["x-location"]
voiceprint_comparison_response = poll_result(polling_url)
return voiceprint_comparison_response.json()


known_audio = "john_doe.wav"
unknown_audios = [
"unknown_01.wav",
"unknown_02.wav",
"unknown_03.wav",
"unknown_04.wav",
"unknown_05.wav",
"unknown_06.wav",
"unknown_07.wav",
"unknown_08.wav",
]
unknown_audios_voiceprints = []

known_audio_response = do_voiceprint_extraction(known_audio)
known_audio_voiceprint = known_audio_response["result"]["channels"][0]["voiceprint"]

for unknown_audio in unknown_audios:
response = do_voiceprint_extraction(unknown_audio)
voiceprint = response["result"]["channels"][0]["voiceprint"]
unknown_audios_voiceprints.append(voiceprint)

voiceprint_comparison_response = do_voiceprint_comparison(
[known_audio_voiceprint], unknown_audios_voiceprints
)

audio_files_with_scores = zip(
unknown_audios, voiceprint_comparison_response["result"]["scores"]["values"]
)
for audio_file, score in audio_files_with_scores:
print(f"{audio_file}\t{score}")