Technology & AI

Build a Real-Time AI Emergency Voice Agent with LangChai

We have all been in an emergency where every second counts. A person’s life is at risk, but when you panic. Now, imagine in this stressful situation where a helpline asks you to press numbers on your keypad to connect with the right agent? Pure chaos, right? Here, we just need someone who will listen and act quickly instead of passing it on again and again without hanging up.

In this blog, we will solve this big challenge by building our own AI Emergency Helpline voice agent. The agent listens to the distress of the caller, assesses the situation, dispatches the appropriate emergency service, and keeps the caller calm, all in real time, all voice.

No typing. There are no menus. Just talk.

Why is there an Emergency Helpline?

Perhaps the most common examples of voice assistants used today are ordering food or streaming music. These “active” use cases are harmless compared to the user experience, but they are easily forgotten. On the other hand, the use of an emergency telephone is completely different.

In this use case, delay is an important factor, the tone of the voice assistant can affect who gets help first, and you cannot use another way to send an emergency vehicle (ambulance). As such, every design decision made within this pipeline has the potential to cause real results, making this design a very important use case to gain experience from.

How Does A Pipeline Work?

The Sandwich Model of Architecture consists of 3 independent components, and each one is designed to work together. Each will start processing independently and at the same time as the one before it completes its processing phase, namely:

  • while speaking, recording will start in the middle of the speaker’s sentence,
  • the thinking agent will start thinking about previous answers while the speaker completes his sentence,
  • text-to-speech will begin to synthesize responses into the speaker’s sentences while the reasoning agent continues to reason.

If everything is done correctly, the whole process will be completed in less than ten seconds. In the case of timed processing, this will allow the sound to be broadcast continuously, without interruption in the delivery of the sound.

Getting Started with Voice Agent

You will need API keys for AssemblyAI (real-time STT) and OpenAI (both agent brain and TTS). You can easily integrate your APIs into one provider and one function by using OpenAI TTS.

Here are the command lines needed to install the required libraries:

!pip install langchain langgraph assemblyai websockets fastapi uvicorn openai 

Instructions for setting local variables:

export ASSEMBLYAI_API_KEY="your_key"
export OPENAI_API_KEY="your_key"
export LANGSMITH_TRACING="true"
export LANGSMITH_API_KEY="your_key" 

You must enable Langsmith to ensure that every conversation between your agent and customer can be considered an audit and can be used as a potential support ticket. Auditing provides compliance and error correction by providing documentation of what your agent said and when.

Phase 1: Speech-to-Text with AssemblyAI

In the STT section, we record the caller’s voice live. Therefore, we will use the WebSocket API from AssemblyAI following the producer-consumer model, where audio components go in and texts go out, respectively, at the same time.

from typing import AsyncIterator
import asyncio
import contextlib


async def stt_stream(
    audio_stream: AsyncIterator[bytes],
) -> AsyncIterator[VoiceAgentEvent]:
    stt = AssemblyAISTT(sample_rate=16000)

    async def send_audio():
        try:
            async for chunk in audio_stream:
                await stt.send_audio(chunk)
        finally:
            await stt.close()

    send_task = asyncio.create_task(send_audio())

    try:
        async for event in stt.receive_events():
            yield event
    finally:
        send_task.cancel()

        with contextlib.suppress(asyncio.CancelledError):
            await send_task

        await stt.close()

Two types of important events are these The highest number of STTs again Output of STT. The highest number of STTs it contains partial transcripts generated while the caller is speaking, allowing a human host to monitor the conversation in real time. Output of STT the final punctuated text used by the agent to initiate actions.

When using AssemblyAI via helpline, the content security detection flag must be enabled. It provides early warnings of distress signals with written metadata before the agent processes the text, giving the agent more time to decide on the right response.

Category 2: Emergency Triage Agent

The second level of assistance to the caller will be through the Emergency Triage Agent. This is when the agent analyzes the text received from the caller, assesses whether assistance is needed, decides which tool to use, and interacts with the caller in a calm manner.

The agent has four tools available to perform these tasks: to look at the place, emergency dispatchescalation to a live operator and non-life-threatening stress reduction to reduce emotional discomfort.

from uuid import uuid4

from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langgraph.checkpoint.memory import InMemorySaver


# Active call registry
active_calls = {}


def get_caller_location(caller_id: str) -> str:
    """Look up the caller's registered address or last known GPS location."""
    locations = {
        "caller_001": "12 MG Road, Bengaluru, Karnataka 560001",
        "caller_002": "45 Park Street, Kolkata, West Bengal 700016",
    }

    return locations.get(
        caller_id,
        "Location not found. Ask caller to confirm address.",
    )


def dispatch_emergency(service: str, location: str, severity: str) -> str:
    """Dispatch police, ambulance, or fire services to a location."""
    valid_services = ["ambulance", "police", "fire"]

    if service.lower() not in valid_services:
        return f"Unknown service: {service}. Use ambulance, police, or fire."

    return (
        f"{service.capitalize()} dispatched to {location}. "
        f"Severity: {severity}. ETA: 8-12 minutes. "
        f"Reference: EM-{uuid4().hex[:6].upper()}"
    )


def escalate_to_human(caller_id: str, reason: str) -> str:
    """Escalate the call to a human operator when the situation exceeds AI capability."""
    active_calls[caller_id] = {
        "status": "escalated",
        "reason": reason,
    }

    return (
        f"Escalating call {caller_id} to human operator. "
        f"Reason: {reason}. Hold time: under 2 minutes."
    )


def calming_protocol(situation: str) -> str:
    """Return guided breathing or grounding instructions for distressed callers."""
    return (
        "I hear you. You are safe right now. "
        "Take a slow breath in for 4 counts, hold for 4, out for 4. "
        "I am here with you."
    )


agent = create_agent(
    model="openai:gpt-4o-mini",
    tools=[
        get_caller_location,
        dispatch_emergency,
        escalate_to_human,
        calming_protocol,
    ],
    system_prompt="""You are ARIA, an AI emergency response assistant for a 24/7 helpline.

Your job is to stay calm, assess the situation quickly, and take the right action.

Rules you must always follow:

- Always acknowledge the caller's distress before asking questions.
- Ask only one question at a time. Never overwhelm a panicking caller.
- If someone mentions chest pain, difficulty breathing, or unconsciousness — dispatch ambulance immediately.
- If someone mentions violence, threats, or break-in — dispatch police immediately.
- If the situation is unclear or emotional crisis — use calming protocol first.
- Escalate to a human operator if the caller is unresponsive or the situation is ambiguous.
- Keep every response under 3 sentences. Short and clear saves lives.
- Do NOT use emojis, asterisks, bullet points, or markdown. You are speaking aloud.""",
    checkpointer=InMemorySaver(),
)

I InMemorySaver The checkpointer plays an important role here as it allows ARIA to remember the entire call history, including:

  • what the caller said three times ago,
  • that has been sent to the caller,
  • whether the caller confirmed his location, etc.

If there was no memory, then every response would start from scratch, which would be very problematic in an emergency.

Next, consider the job of a broadcast agent.

async def agent_stream(
    event_stream: AsyncIterator[VoiceAgentEvent],
) -> AsyncIterator[VoiceAgentEvent]:
    thread_id = str(uuid4())  # Unique per call session

    async for event in event_stream:
        yield event

        if event.type == "stt_output":
            stream = agent.astream(
                {"messages": [HumanMessage(content=event.transcript)]},
                {"configurable": {"thread_id": thread_id}},
                stream_mode="messages",
            )

            async for message, _ in stream:
                if message.text:
                    yield AgentChunkEvent.create(message.text)

stream_mode="messages" sends tokens to TTS as they are generated. ARIA’s first words have already started to be said before she finishes her concert. This is what creates a 400 millisecond response compared to a 2 second response!

Stage 3: Text-to-Speech with OpenAI TTS

OpenAI TTS is a natural choice, you already use an OpenAI API key in your agent, thus making one API call, one SDK, and no additional accounts. I tts-1 the model is designed for real-time/streaming text-to-speech translation. The sparkling voice is very calm, clear, and logical; all relevant emergency call tones.

from utils import merge_async_iters
from openai import AsyncOpenAI


client = AsyncOpenAI()


async def tts_stream(
    event_stream: AsyncIterator[VoiceAgentEvent],
) -> AsyncIterator[VoiceAgentEvent]:
    text_buffer = []

    async def process_upstream() -> AsyncIterator[VoiceAgentEvent]:
        async for event in event_stream:
            yield event

            if event.type == "agent_chunk":
                text_buffer.append(event.text)

    async def synthesize_audio() -> AsyncIterator[VoiceAgentEvent]:
        full_text = "".join(text_buffer)

        if not full_text.strip():
            return

        async with client.audio.speech.with_streaming_response.create(
            model="tts-1",
            voice="shimmer",  # Calm, composed — right for emergencies
            input=full_text,
            response_format="pcm",  # Raw PCM for lowest latency playback
        ) as response:
            async for chunk in response.iter_bytes(chunk_size=4096):
                yield TTSChunkEvent.create(chunk)

    async for event in merge_async_iters(
        process_upstream(),
        synthesize_audio(),
    ):
        yield event

Tts-1 starts streaming audio clips as soon as the first sentence is compiled rather than waiting until the entire sentence is created. You can use response_format="pcm" bypassing the container overhead and streaming the audio directly to the websocket byte stream. With tts-1-hd this means that while the quality is increased, there will be about a 200ms increase in latency compared to using tts-1. For best emergency call performance, it is advised to use the tts-1 voice option.

There are several voice options available to you: alloy is a neutral and confident voice; Echo has a little warmth in her voice; which shines with a soft and strong voice. All three are good options for the context of the helpline, while you should avoid legend and onyx because they can be indifferent or too authoritative respectively.

Using merge_async_itersyou’ll be able to do text compilation and audio mixing at the same time so your audio byte stream starts flowing immediately after the first sentence is finished.

Full Pipe Connection

LangChain’s RunnableGenerator connects all three stages into a single composable pipeline:

from langchain_core.runnables import RunnableGenerator
from fastapi import FastAPI, WebSocket


app = FastAPI()

pipeline = (
    RunnableGenerator(stt_stream)
    | RunnableGenerator(agent_stream)
    | RunnableGenerator(tts_stream)
)


@app.websocket("/ws/{caller_id}")
async def websocket_endpoint(websocket: WebSocket, caller_id: str):
    await websocket.accept()

    active_calls[caller_id] = {"status": "active"}

    async def audio_stream():
        while True:
            data = await websocket.receive_bytes()
            yield data

    try:
        async for event in pipeline.atransform(audio_stream()):
            if event.type == "tts_chunk":
                await websocket.send_bytes(event.audio)
    finally:
        active_calls[caller_id]["status"] = "ended"
        await websocket.close()

Check out the caller_id inside the WebSocket method. Each call connection will be tracked from the beginning of the connection to the end of the connection. All entries in the call register will be updated, even if there is a loss of communication during the call (which can happen during real emergencies).

Testing the Voice Agent

We have built the entire pipeline and now we are going to do some testing based on various conditions.

Scenario 1: Call for Medical Chest Pain

The woman’s husband faints due to chest pain and numbness in his left arm. ARIA diagnoses a cardiac emergency, dispatches an ambulance, and provides instructions while waiting.

Answer:

Scenario 2: Break in and deal with an active threat

The caller is hiding in their room while someone breaks in downstairs. ARIA dispatches the police immediately and keeps the victim quiet and still until help arrives.

Answer:

Scenario 3: Fire causing smoke and confusion

A neighbor sees thick smoke in the apartment next door with no sign of anyone inside. ARIA dispatches the fire department and directs the caller to evacuate and alert the building.

Answer:

Situation 4: Emotional Crisis due to panic attacks

The caller has not left her apartment in three days and is hyperventilating without a clear emergency. ARIA uses a protocol of sedation first, then dispatches an ambulance when respiratory distress is confirmed.

Answer:

The conclusion

You now have an active emergency agent at your disposal. ARIA listens 24/7 and provides triage, service dispatch through the correct channel and relays messages to the caller using an accurate and calm voice in less than 700 ms. The sandwich architecture gives you full interchangeability of all components.

Subsequent enhancements include call recording, per-answer testing, scalable live monitoring dashboards, and voice activity detection for seamless disruption. These can be added without rewriting the pipeline. Priority voice agents are more difficult than a help desk because they have to deliver emergency support without going silent when callers need help the most.

Riya Bansal

Data Science Trainee at Analytics Vidhya
I currently work as a Data Science Trainer at Analytics Vidhya, where I focus on building data-driven solutions and applying AI/ML techniques to solve real-world business problems. My work allows me to explore advanced analytics, machine learning, and AI applications that empower organizations to make smarter, evidence-based decisions.
With a strong foundation in computer science, software development, and data analysis, I am passionate about using AI to create impactful, innovative solutions that bridge the gap between technology and business.
📩 You can also contact me at [email protected]

Sign in to continue reading and enjoy content curated by experts.

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button