Speaker search
We are going to do speaker search. It means we want to find known speaker(s) inside a large archive of speakers and essentially answer the question: where are these known speakers speaking? Let's say we already have an audio recording with the known voice of John Doe. We also have an archive of speakers and we would like to discover which recordings from the archive likely contain John Doe's voice.
The process of speaker search consists of voiceprint extraction and N
to M
voiceprint comparison, where M
is the size of the speaker archive. Both
voiceprint extraction and comparison are explained in the following sections.
The full Python code for this example is provided at the
end of this guide. For simplicity, our N
will be 1 in this example, but you
can use analogical approach to search for multiple known speakers in the speaker
archive. Both extraction and comparison are explained in the following sections.
Attached you will find a ZIP file recordings.zip
containing a mono-channel audio file of John Doe (john_doe.wav
) speaking and a
speaker archive consisting of 8 mono-channel recordings (from unknown_01.wav
to unknown_08.wav
), therefore in this case, M = 8
.
Environment setup
We are using Python 3.9
and the Python library requests 2.27
in this
example. You can install requests
with pip
as follows:
pip install requests~=2.27
Then, you can import the following libraries (time
is built-in):
import time
import requests
Voiceprint extraction
In order to run voiceprint extraction for a single audio file, you should start
by sending a POST
request to
/api/technology/speaker-identification-voiceprint-extraction
as follows:
with open("john_doe.wav", mode="rb") as file:
files = {"file": file}
# Replace <speech-platform-server> with your actual server URL
response = requests.post(
"https://<speech-platform-server>/api/technology/speaker-identification-voiceprint-extraction",
files=files,
)
response.raise_for_status()
If the task was successfully accepted, 202 code will be returned together with a
unique task ID
in the response body. The task isn't immediately processed, but
only scheduled for processing. You can check the current task status whilst
polling for the result.
The URL for polling for the result is returned in the X-Location
header.
Alternatively, you can assemble the polling URL on your own by appending a slash
(/
) and task ID
to the initial voiceprint extraction URL.
polling_url = response.headers["x-location"]
counter = 0
while counter < 100:
response = requests.get(polling_url)
response.raise_for_status()
data = response.json()
task_status = data["task"]["state"]
if task_status in ["done", "failed", "rejected"]:
break
counter += 1
time.sleep(5)
Once the polling finishes, data
will contain the latest response from the
server -- either a response with the extracted voiceprint(s), or an error
message with details in case processing was not able to finish. The response
contains one voiceprint for each channel. Example result of a successful
voiceprint extraction from a stereo file (voiceprints were shortened for
readability):
{
"task": {"task_id": "fb9de4e5-a768-4069-aff3-c74c826f3ddf", "state": "done"},
"result": {
"channels": [
{
"channel_number": 0,
"voiceprint": "e2kDY3JjbDAWiyhpCWVtYmVkZGluZ1tkO/QWvmS8JkuGZDyv+F5kvJQzJ...",
"speech_length": 49.08,
"model": "sid-xl5",
},
{
"channel_number": 1,
"voiceprint": "e2kDY3JjbFL6NSxpCWVtYmVkZGluZ1tkO2ygcGS8CduAZDxa6ZBkPHrYf...",
"speech_length": 116.35,
"model": "sid-xl5",
},
]
},
}
Let's get back to our example with the mono audio files. In our case, the target voiceprint can be accessed as follows:
known_audio_voiceprint = data["result"]["channels"][0]["voiceprint"]
Great, you've extracted your first voiceprint and assigned it to a variable,
congratulations! Now, you can repeat the same process for the entire archive and
collect the extracted voiceprints in a list
, so it's ready for the comparison.
Let's name the list
unknown_audios_voiceprints
. The easiest way to do so is
to repeat the same steps for each unknown audio file in a for
loop and append
each extracted voiceprint to the unknown_audios_voiceprints
list. Note that
this unknown_audios_voiceprints
list should have the length of M
. Please
refer to the full Python code to see how it's done.
Voiceprint comparison
The voiceprints are extracted, so we can move on to the actual comparison. Two non-empty lists of voiceprints are expected as input for comparison. Each voiceprint is expected to be a Base64-encoded string, but you don't have to worry about it -- the voiceprints are already returned in this format from the voiceprint extraction.
Doing the voiceprint comparison is analogical to voiceprint extraction -- we
start by requesting the voiceprint comparison task to be scheduled for our two
voiceprint lists. Notice that known_audio_voiceprint
is just a string
, so it
must be placed inside a one-element list
in the request body. On the other
hand, unknown_audios_voiceprints
already is a list
, and therefore can be
used directly.
body = {
"voiceprints_a": [known_audio_voiceprint],
"voiceprints_b": unknown_audios_voiceprints,
}
response = requests.post(
"https://<speech-platform-server>/api/technology/speaker-identification-voiceprint-comparison",
json=body,
)
response.raise_for_status()
Now you can poll for the task result as it was done above for the voiceprint
extraction. The result is a comparison matrix with a shape based on the sizes of
both input voiceprint lists. In the case of our current speaker search, the
resulting matrix has the shape of 1xM
, where M
is the size of the speaker
archive. After you finish polling, the result of the comparison can be found in
data
and should look like this:
{
"task": {"task_id": "4178f672-20b4-4f79-b7eb-a871bbae4456", "state": "done"},
"result": {
"scores": {
"rows_count": 1,
"columns_count": 8,
"values": [
-4.726645469665527,
9.340583801269531,
6.426426887512207,
-5.342464447021484,
4.384160041809082,
7.261765480041504,
-5.660372257232666,
4.433615684509277,
],
}
},
}
Let's map the scores to their corresponding audio files for better readability:
audio_files_with_scores = zip(
unknown_audios, voiceprint_comparison_response["result"]["scores"]["values"]
)
for audio_file, score in audio_files_with_scores:
print(f"{audio_file}\t{score}")
It should result in the following output:
unknown_01.wav -4.726645469665527
unknown_02.wav 9.340583801269531
unknown_03.wav 6.426426887512207
unknown_04.wav -5.342464447021484
unknown_05.wav 4.384160041809082
unknown_06.wav 7.261765480041504
unknown_07.wav -5.660372257232666
unknown_08.wav -4.433615684509277
This result shows that John Doe is very likely speaking in the following files:
unknown_02.wav
, unknown_03.wav
, unknown_05.wav
, unknown_06.wav
, but not
in the rest of the speaker archive.
For more details on how scoring is handled, refer to the Scoring and conversion to percentage section in our speaker identification guide.
Full Python code
Here is the full code for this example, slightly adjusted and wrapped into functions for better readability:
import time
import requests
SPEECH_PLATFORM_SERVER = "<speech-platform-server>" # Replace with your actual server URL
def poll_result(polling_url: str, sleep: int = 5):
while True:
response = requests.get(polling_url)
response.raise_for_status()
data = response.json()
task_status = data["task"]["state"]
if task_status in ["done", "failed", "rejected"]:
break
time.sleep(sleep)
return response
def do_voiceprint_extraction(audio_path: str):
with open(audio_path, mode="rb") as file:
files = {"file": file}
response = requests.post(
f"https://{SPEECH_PLATFORM_SERVER}/api/technology/speaker-identification-voiceprint-extraction",
files=files,
)
response.raise_for_status()
polling_url = response.headers["x-location"]
voiceprint_extraction_response = poll_result(polling_url)
return voiceprint_extraction_response.json()
def do_voiceprint_comparison(voiceprints_a: list, voiceprints_b: list):
response = requests.post(
f"https://{SPEECH_PLATFORM_SERVER}/api/technology/speaker-identification-voiceprint-comparison",
json={"voiceprints_a": voiceprints_a, "voiceprints_b": voiceprints_b},
)
response.raise_for_status()
polling_url = response.headers["x-location"]
voiceprint_comparison_response = poll_result(polling_url)
return voiceprint_comparison_response.json()
known_audio = "john_doe.wav"
unknown_audios = [
"unknown_01.wav",
"unknown_02.wav",
"unknown_03.wav",
"unknown_04.wav",
"unknown_05.wav",
"unknown_06.wav",
"unknown_07.wav",
"unknown_08.wav",
]
unknown_audios_voiceprints = []
known_audio_response = do_voiceprint_extraction(known_audio)
known_audio_voiceprint = known_audio_response["result"]["channels"][0]["voiceprint"]
for unknown_audio in unknown_audios:
response = do_voiceprint_extraction(unknown_audio)
voiceprint = response["result"]["channels"][0]["voiceprint"]
unknown_audios_voiceprints.append(voiceprint)
voiceprint_comparison_response = do_voiceprint_comparison(
[known_audio_voiceprint], unknown_audios_voiceprints
)
audio_files_with_scores = zip(
unknown_audios, voiceprint_comparison_response["result"]["scores"]["values"]
)
for audio_file, score in audio_files_with_scores:
print(f"{audio_file}\t{score}")