Technology & AI

How to Design an Agenttic Workflow for Tool-Driven Route Development with Deterministic Computation and Scheduled Results

In this tutorial, we build a production-style Route Optimizer agent for a shipping center using the latest LangChain agent APIs. We design a tool-driven workflow where the agent calculates distances, ETAs, and optimal routes rather than guesswork, and enforces structured results to make the results directly usable in downstream systems. We integrate location statistics, adjustable speed profiles, traffic breakers, and multi-stop route optimization, ensuring that the agent behaves deterministically while still thinking dynamically through the tools.

!pip -q install -U langchain langchain-openai pydantic


import os
from getpass import getpass


if not os.environ.get("OPENAI_API_KEY"):
   os.environ["OPENAI_API_KEY"] = getpass("Enter OPENAI_API_KEY (input hidden): ")


from typing import Dict, List, Optional, Tuple, Any
from math import radians, sin, cos, sqrt, atan2


from pydantic import BaseModel, Field, ValidationError


from langchain_openai import ChatOpenAI
from langchain.tools import tool
from langchain.agents import create_agent

Set up the workspace and make sure all required libraries are installed and imported correctly. We securely load the OpenAI API key so that the agent can interact with the language model without hardcoded details. We also fix critical dependencies on electronic tools, agents, and scheduled outputs.

SITES: Dict[str, Dict[str, Any]] = {
   "Rig_A": {"lat": 23.5880, "lon": 58.3829, "type": "rig"},
   "Rig_B": {"lat": 23.6100, "lon": 58.5400, "type": "rig"},
   "Rig_C": {"lat": 23.4500, "lon": 58.3000, "type": "rig"},
   "Yard_Main": {"lat": 23.5700, "lon": 58.4100, "type": "yard"},
   "Depot_1": {"lat": 23.5200, "lon": 58.4700, "type": "depot"},
   "Depot_2": {"lat": 23.6400, "lon": 58.4300, "type": "depot"},
}


SPEED_PROFILES: Dict[str, float] = {
   "highway": 90.0,
   "arterial": 65.0,
   "local": 45.0,
}


DEFAULT_TRAFFIC_MULTIPLIER = 1.10


def haversine_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
   R = 6371.0
   dlat = radians(lat2 - lat1)
   dlon = radians(lon2 - lon1)
   a = sin(dlat / 2) ** 2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon / 2) ** 2
   return R * c

We describe master domain data representing rigs, yards, and depots and their geographic coordinates. We develop speed profiles and an automatic traffic multiplier to reflect realistic driving conditions. We also use the Haversine distance function, which serves as the mathematical backbone of all routing decisions.

def _normalize_site_name(name: str) -> str:
   return name.strip()


def _assert_site_exists(name: str) -> None:
   if name not in SITES:
       raise ValueError(f"Unknown site '{name}'. Use list_sites() or suggest_site().")


def _distance_between(a: str, b: str) -> float:
   _assert_site_exists(a)
   _assert_site_exists(b)
   sa, sb = SITES[a], SITES[b]
   return float(haversine_km(sa["lat"], sa["lon"], sb["lat"], sb["lon"]))


def _eta_minutes(distance_km: float, speed_kmph: float, traffic_multiplier: float) -> float:
   speed = max(float(speed_kmph), 1e-6)
   base_minutes = (distance_km / speed) * 60.0
   return float(base_minutes * max(float(traffic_multiplier), 0.0))


def compute_route_metrics(path: List[str], speed_kmph: float, traffic_multiplier: float) -> Dict[str, Any]:
   if len(path) < 2:
       raise ValueError("Route path must include at least origin and destination.")
   for s in path:
       _assert_site_exists(s)
   legs = []
   total_km = 0.0
   total_min = 0.0
   for i in range(len(path) - 1):
       a, b = path[i], path[i + 1]
       d_km = _distance_between(a, b)
       t_min = _eta_minutes(d_km, speed_kmph, traffic_multiplier)
       legs.append({"from": a, "to": b, "distance_km": d_km, "eta_minutes": t_min})
       total_km += d_km
       total_min += t_min
   return {"route": path, "distance_km": float(total_km), "eta_minutes": float(total_min), "legs": legs}

We create low-level utility functions that validate site names and calculate distances and travel times. We use logic to calculate each leg and total route metrics by decision. This ensures that all ETA and distance returned by the agent are based on precise calculations instead of assumptions.

def _all_paths_with_waypoints(origin: str, destination: str, waypoints: List[str], max_stops: int) -> List[List[str]]:
   from itertools import permutations
   waypoints = [w for w in waypoints if w not in (origin, destination)]
   max_stops = int(max(0, max_stops))
   candidates = []
   for k in range(0, min(len(waypoints), max_stops) + 1):
       for perm in permutations(waypoints, k):
           candidates.append([origin, *perm, destination])
   if [origin, destination] not in candidates:
       candidates.insert(0, [origin, destination])
   return candidates


def find_best_route(origin: str, destination: str, allowed_waypoints: Optional[List[str]], max_stops: int, speed_kmph: float, traffic_multiplier: float, objective: str, top_k: int) -> Dict[str, Any]:
   origin = _normalize_site_name(origin)
   destination = _normalize_site_name(destination)
   _assert_site_exists(origin)
   _assert_site_exists(destination)
   allowed_waypoints = allowed_waypoints or []
   for w in allowed_waypoints:
       _assert_site_exists(_normalize_site_name(w))
   objective = (objective or "eta").strip().lower()
   if objective not in {"eta", "distance"}:
       raise ValueError("objective must be one of: 'eta', 'distance'")
   top_k = max(1, int(top_k))
   candidates = _all_paths_with_waypoints(origin, destination, allowed_waypoints, max_stops=max_stops)
   scored = []
   for path in candidates:
       metrics = compute_route_metrics(path, speed_kmph=speed_kmph, traffic_multiplier=traffic_multiplier)
       score = metrics["eta_minutes"] if objective == "eta" else metrics["distance_km"]
       scored.append((score, metrics))
   scored.sort(key=lambda x: x[0])
   best = scored[0][1]
   alternatives = [m for _, m in scored[1:top_k]]
   return {"best": best, "alternatives": alternatives, "objective": objective}

We introduce multi-stop routing logic by creating candidate routes with optional waypoints. We evaluate each candidate route against a clear optimization objective, such as ETA or distance. We then evaluate the routes and extract the best option and a set of stronger alternatives.

@tool
def list_sites(site_type: Optional[str] = None) -> List[str]:
   if site_type:
       st = site_type.strip().lower()
       return sorted([k for k, v in SITES.items() if str(v.get("type", "")).lower() == st])
   return sorted(SITES.keys())


@tool
def get_site_details(site: str) -> Dict[str, Any]:
   s = _normalize_site_name(site)
   _assert_site_exists(s)
   return {"site": s, **SITES[s]}


@tool
def suggest_site(query: str, max_suggestions: int = 5) -> List[str]:
   q = (query or "").strip().lower()
   max_suggestions = max(1, int(max_suggestions))
   scored = []
   for name in SITES.keys():
       n = name.lower()
       common = len(set(q) & set(n))
       bonus = 5 if q and q in n else 0
       scored.append((common + bonus, name))
   scored.sort(key=lambda x: x[0], reverse=True)
   return [name for _, name in scored[:max_suggestions]]


@tool
def compute_direct_route(origin: str, destination: str, road_class: str = "arterial", traffic_multiplier: float = DEFAULT_TRAFFIC_MULTIPLIER) -> Dict[str, Any]:
   origin = _normalize_site_name(origin)
   destination = _normalize_site_name(destination)
   rc = (road_class or "arterial").strip().lower()
   if rc not in SPEED_PROFILES:
       raise ValueError(f"Unknown road_class '{road_class}'. Use one of: {sorted(SPEED_PROFILES.keys())}")
   speed = SPEED_PROFILES[rc]
   return compute_route_metrics([origin, destination], speed_kmph=speed, traffic_multiplier=float(traffic_multiplier))


@tool
def optimize_route(origin: str, destination: str, allowed_waypoints: Optional[List[str]] = None, max_stops: int = 2, road_class: str = "arterial", traffic_multiplier: float = DEFAULT_TRAFFIC_MULTIPLIER, objective: str = "eta", top_k: int = 3) -> Dict[str, Any]:
   origin = _normalize_site_name(origin)
   destination = _normalize_site_name(destination)
   rc = (road_class or "arterial").strip().lower()
   if rc not in SPEED_PROFILES:
       raise ValueError(f"Unknown road_class '{road_class}'. Use one of: {sorted(SPEED_PROFILES.keys())}")
   speed = SPEED_PROFILES[rc]
   allowed_waypoints = allowed_waypoints or []
   allowed_waypoints = [_normalize_site_name(w) for w in allowed_waypoints]
   return find_best_route(origin, destination, allowed_waypoints, int(max_stops), float(speed), float(traffic_multiplier), str(objective), int(top_k))

We expose routing and logic detection as callable tools for agents. We allow the agent to list sites, check site information, resolve ambiguities, and calculate both direct and optimized routes. This tooling layer ensures that the agent is always justified by calling validated functions rather than false results.

class RouteLeg(BaseModel):
   from_site: str
   to_site: str
   distance_km: float
   eta_minutes: float


class RoutePlan(BaseModel):
   route: List[str]
   distance_km: float
   eta_minutes: float
   legs: List[RouteLeg]
   objective: str


class RouteDecision(BaseModel):
   chosen: RoutePlan
   alternatives: List[RoutePlan] = []
   assumptions: Dict[str, Any] = {}
   notes: str = ""
   audit: List[str] = []


llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.2)


SYSTEM_PROMPT = (
   "You are the Route Optimizer Agent for a logistics dispatch center.n"
   "You MUST use tools for any distance/ETA calculation.n"
   "Return ONLY the structured RouteDecision."
)


route_agent = create_agent(
   model=llm,
   tools=[list_sites, get_site_details, suggest_site, compute_direct_route, optimize_route],
   system_prompt=SYSTEM_PROMPT,
   response_format=RouteDecision,
)


def get_route_decision(origin: str, destination: str, road_class: str = "arterial", traffic_multiplier: float = DEFAULT_TRAFFIC_MULTIPLIER, allowed_waypoints: Optional[List[str]] = None, max_stops: int = 2, objective: str = "eta", top_k: int = 3) -> RouteDecision:
   user_msg = {
       "role": "user",
       "content": (
           f"Optimize the route from {origin} to {destination}.n"
           f"road_class={road_class}, traffic_multiplier={traffic_multiplier}n"
           f"objective={objective}, top_k={top_k}n"
           f"allowed_waypoints={allowed_waypoints}, max_stops={max_stops}n"
           "Return the structured RouteDecision only."
       ),
   }
   result = route_agent.invoke({"messages": [user_msg]})
   return result["structured_response"]


decision1 = get_route_decision("Yard_Main", "Rig_B", road_class="arterial", traffic_multiplier=1.12)
print(decision1.model_dump())


decision2 = get_route_decision("Rig_C", "Rig_B", road_class="highway", traffic_multiplier=1.08, allowed_waypoints=["Depot_1", "Depot_2", "Yard_Main"], max_stops=2, objective="eta", top_k=3)
print(decision2.model_dump())

We describe robust Pydantic schemes for forcing structured, machine-readable outputs from an agent. We implement a language model and create an agent with a clear system information and response format. We then show how to invoke the agent and find reliable routing decisions suitable for real-world workflows.

In conclusion, we implemented a robust, scalable routing optimization agent that selects the best path between sites while clearly defining its considerations and alternatives. We have shown how combining a deterministic approach with LLM tool calling produces reliable, readable decisions suitable for real planning tasks. This foundation allows us to easily extend the system with live traffic data, fleet limits, or cost-based objectives, making the agent an active component in a large shipping or vehicle management environment.


Check it out Full Codes here. Also, feel free to follow us Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to Our newspaper. Wait! are you on telegram? now you can join us on telegram too.


Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button