import os
import sys
import pathlib
import subprocess
import time
import fire
import numpy as np
import logging
import datetime
from typing import List, Tuple
from lifelong_mapf_argos.ArgosConfig import (SERVER_EXE, PBS_EXE, TPBS_EXE,
RHCR_EXE, MASS_EXE,
CONTAINER_PROJECT_ROOT,
PROJECT_ROOT, setup_logging)
from lifelong_mapf_argos.ArgosConfig.ToArgos import (obstacles, parse_map_file,
create_Argos)
logger = logging.getLogger(__name__)
def get_current_time() -> str:
"""Get the current time in the format YYYY-MM-DD HH-MM-SS.FFF."""
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
def init_start_locations(
map_str: List[str],
num_agents: int,
) -> List[Tuple[str, str]]:
# Get free locations
h, w = len(map_str), len(map_str[0])
free_locations = []
for i in range(h):
for j in range(w):
if map_str[i][j] not in obstacles:
free_locations.append(i * w + j)
if len(free_locations) < num_agents:
logger.error(
f"Number of agents ({num_agents}) exceeds number of free locations ({len(free_locations)})."
)
exit(-1)
# Randomly select start locations
starts = np.random.choice(free_locations, size=num_agents, replace=False)
# We need to convert the coordinate from (row, col) to (col, row) because
# this is how Argos expects it.
return [(str(start % w), str(start // w)) for start in starts]
def check_file(file_path: str):
if not os.path.exists(file_path):
logger.info(f"{file_path} not exists!")
return False
return True
def run_simulator(args, timeout: float = None, output_log: str = None):
server_command, client_command, planner_command = args
f = open(output_log, 'w') if output_log else None
# Start the server process
server_process = subprocess.Popen(server_command, stdout=f, stderr=f)
# Wait for a short period to ensure the server has started
time.sleep(1)
# Start the client process
client_process = subprocess.Popen(client_command, stdout=f, stderr=f)
# Wait for the client process to complete
# time.sleep(5)
planner_process = subprocess.Popen(planner_command, stdout=f, stderr=f)
# The client process will call the server to end, then the client end. The
# planner will detect the end of the server and end itself.
try:
client_process.wait(timeout=timeout)
print(f"[{get_current_time()}] [Py] Client process finished.", file=f)
server_process.wait(timeout=timeout)
# planner_process.wait()
print(f"[{get_current_time()}] [Py] Server process finished.", file=f)
planner_process.kill()
print(f"[{get_current_time()}] [Py] Planner process finished.", file=f)
except subprocess.TimeoutExpired:
# print("Timeout expired, killing processes...")
logger.info("Timeout expired, killing processes...")
client_process.kill()
server_process.kill()
planner_process.kill()
finally:
if f:
f.close()
# print("Processes killed.")
logger.info("Processes killed.")
[docs]
def run_lifelong_argos(
map_filepath: str = "maps/kiva_large_w_mode.json",
num_agents: int = 100,
headless: bool = False,
argos_config_filepath: str = "output.argos",
stats_name: str = "stats.json",
save_stats: bool = False,
output_log: str = None,
port_num: int = 8182,
n_threads: int = 1,
sim_duration: int = 600 * 10,
sim_window_tick: int = 20,
ticks_per_second: int = 10,
velocity: float = 200.0,
planner: str = "RHCR",
container: bool = False,
seed: int = 42,
screen: int = 0,
backup_solver: str = "PIBT",
planner_invoke_policy: str = "default",
task_assigner_type: str = "windowed",
planning_window: int = 10,
frame_grab: bool = False,
cutoffTime: int = 1,
# RHCR parameters
solver: str = "PBS",
single_agent_solver: str = "SIPP",
rotation: bool = False,
rotation_time: int = 1,
prioritize_start: bool = True,
suboptimal_bound: float = 1,
log: bool = False,
save_result: bool = False,
save_solver: bool = False,
save_heuristics_table: bool = False,
):
"""Function to run the LSMART simulator with the given parameters.
Args:
map_filepath (str, optional): file path to map. Example maps are in the ``maps`` directory. If maps contains workstations (``w``) and endpoints (``e``), robots' tasks will be assigned alternately between workstations and endpoints. If not, robots' tasks will be randomly generated from the empty spaces. Defaults to ``maps/kiva_large_w_mode.json``.
num_agents (int, optional): number of robots. Defaults to 32.
headless (bool, optional): whether run in headless mode. If False, a visualization will be generated. Defaults to False.
argos_config_filepath (str, optional): file path to write the generated
Argos config file. Defaults to "output.argos".
stats_name (str, optional): file path to store the stats from the
simulator. Defaults to "stats.json".
save_stats (bool , optional): whether to save the stats. Defaults to
False.
output_log (str, optional): file path to store the stdout from the
simulator. If None, the output will not be saved to a file.
Defaults to None.
port_num (int, optional): port number of RPC server. Defaults to 8182.
n_threads (int, optional): number of threads to run Argos. Defaults to 1.
ticks_per_second (int, optional): the simulator runs in ``ticks``. The states of the robots are updated per tick. ``ticks_per_second`` specifies the number of ticks per simulation second used by the simulator. Defaults to 10.
sim_duration (int, optional): number of simulation ticks to run the
simulator. Defaults to 1800 * 10, meaning 1800 seconds.
sim_window_tick (int, optional): number of ticks to invoke the planner. Only applies to the periodic invocation policy. Defaults to 20 ticks.
velocity (float, optional): velocity of the robots in cm/s. Defaults to
200.0 cm/s.
planner (str, optional): planner to use. Options include:
- ``RHCR``: the Rolling Horizon Collision Resolution planner, (`Li et al. 2021`_). RHCR plans for windowed paths for all robots.
- ``MASS``: the MAPF-SSIPP-SPS planner, (`Yan et al. 2025`_). MASS plans for full-horizon paths with 2nd order dynamics for all robots.
- ``PBS``: the Priority-Based Search planner (`Ma et al. 2019`_). PBS plans for full-horizon paths for all robots.
- ``TPBS``: the `Transient` Priority-Based Search planner (`Morag et al. 2025`_). TPBS plans for full-horizon paths for all robots even if there are duplicate goals.
Defaults to ``RHCR``.
container (bool, optional): whether to run in a `singularity`_ container. Defaults to False.
seed (int, optional): random seed. Defaults to 42.
screen (int, optional): logging options. Higher values increase verbosity. Defaults to 0.
backup_solver (str, optional): backup solver (fail policy) used in case the MAPF planner fails. Options include:
- ``PIBT``: the Priority Inheritance with Backtracking, (`Okumura et al. 2019`_).
- ``LRA``: the Local Repair Guided Waits, (`Li et al. 2021`_).
- ``GuidedPIBT``: Guided PIBT, (`Chen et al. 2024`_).
Defaults to ``PIBT``.
planner_invoke_policy (str, optional): planner invocation policy, options include:
- ``default``: the periodic policy where the planner is invoked periodically every ``sim_window_tick`` ticks.
- ``no_action``: the event-based policy where the planner is invoked when at least one robot has no action to execute in the ADG (`Hönig et al. 2019`_).
Defaults to ``default``.
task_assigner_type (str, optional): task assigner (MAPF problem instance generator) used to generate problem instances. Options include:
- ``windowed``: the windowed task assigner (`Li et al. 2021`_), which assigns tasks within the planning window. This can only be used with the ``RHCR`` planner.
- ``distinct-one-goal``: the distinct one-goal task assigner, which assigns each robot a distinct goal. This can only be used with the ``PBS`` and ``MASS`` planners.
- ``one-goal``: the one-goal task assigner, which assigns each robot a goal regardless of duplicates. This can only be used with the ``TPBS`` planner.
Defaults to ``windowed``.
planning_window (int, optional): planning window in timesteps. The final planning window is the max of this value and the inferred planning window from ``sim_window_tick``.
Specifically, given ``sim_window_tick`` and ``ticks_per_second``, we compute the minimal planning window in timesteps required to cover the simulation window as follows:
.. math::
planning\_window\_ts = \\lceil \\frac{sim\_window\_tick}{ticks\_per\_second} \\times \\frac{velocity}{100} \\rceil
Then given the user-specified ``planning_window``, the final planning window is:
.. math::
planning\_window = max(planning\_window\_ts, planning\_window)
Defaults to 10.
cutoffTime (int, optional): time limit of the planner in seconds. With the periodic invocation policy (``default``), the ``cutoffTime`` should be less than or equal to the simulation window in seconds (:math:`\\frac{sim\_window\_tick}{ticks\_per\_second}`). Defaults to 1.
frame_grab (bool, optional): whether to enable frame grabber in Argos.
If enabled, the simulator will save screenshots to the ``frames``
folder. The screenshots can be combined into a video using external
tools. Defaults to False.
solver (str, optional): MAPF solver in RHCR. Options include ``PBS`` (`Ma et al. 2019`_) and ``PIBT`` (`Okumura et al. 2019`_). Defaults to ``PBS``.
single_agent_solver (str, optional): single agent solver in RHCR.
Options include ``SIPP`` (`Phillips et al. 2011`_) and ``ASTAR``.
Defaults to ``SIPP``.
rotation (bool, optional): whether the single-agent planning consider rotation in RHCR. Defaults to False.
rotation_time (int, optional): rotation time in timesteps in RHCR.
Defaults to 1.
prioritize_start (bool, optional): prioritize start in RHCR. Defaults
to True.
suboptimal_bound (float, optional): suboptimality bound of certain
solvers in RHCR. Defaults to 1.
log (bool, optional): logging in RHCR. Defaults to False.
save_result (bool, optional): save planning results or not in RHCR.
Defaults to False.
save_solver (bool, optional): save MAPF solver process in RHCR or not.
Defaults to False.
save_heuristics_table (bool, optional): save heuristic table or not.
Defaults to False.
.. _Li et al. 2021: https://arxiv.org/abs/2005.07371
.. _Yan et al. 2025: https://arxiv.org/abs/2412.13359
.. _Ma et al. 2019: https://arxiv.org/abs/1812.06356
.. _Morag et al. 2025: https://ojs.aaai.org/index.php/SOCS/article/view/35998
.. _Okumura et al. 2019: https://arxiv.org/abs/1901.11282
.. _Chen et al. 2024: https://arxiv.org/abs/2308.11234
.. _Hönig et al. 2019: https://ieeexplore.ieee.org/document/8620328
.. _Phillips et al. 2011: https://www.cs.cmu.edu/~maxim/files/sipp_icra11.pdf
.. _singularity: https://sylabs.io/singularity/
"""
np.random.seed(seed)
setup_logging()
map_data, width, height = parse_map_file(map_filepath)
# Transform the map and scen to Argos config file, obstacles: '@', 'T'
robot_init_pos = init_start_locations(map_data, num_agents)
# Use absolute path for argos config
argos_config_filepath = os.path.abspath(argos_config_filepath)
create_Argos(
map_data=map_data,
output_file_path=argos_config_filepath,
width=width,
height=height,
robot_init_pos=robot_init_pos,
curr_num_agent=num_agents,
port_num=port_num,
n_threads=n_threads,
visualization=not headless,
sim_duration=sim_duration,
ticks_per_second=ticks_per_second,
screen=screen,
velocity=velocity,
container=container,
seed=seed,
frame_grab=frame_grab,
)
if screen > 0:
logger.info(f"Argos config file created at {argos_config_filepath}.")
# Infer look_ahead_dist from cutoffTime
# The look_ahead_dist is the number of actions the planner should
# look ahead to find the start location for the robot. The planner will
# return in cutoffTime seconds, so look_ahead_dist should be set to the
# number of actions that can be executed in cutoffTime seconds.
# Each action corresponds to half of a grid, which is 0.5 m, so the
# shortest time it takes to execute an action is
# 0.5 m / (velocity / 100 m/s).
look_ahead_dist = np.ceil(cutoffTime / (0.5 /
(velocity / 100))).astype(int)
look_ahead_tick = np.ceil(look_ahead_dist * (0.5 / (velocity / 100)) *
ticks_per_second).astype(int)
# Infer the planning window
# planning window = sim window in timesteps + look_ahead_dist in timesteps
# The planner should make sure that the plan is valid for the next
# simulation window + look_ahead_dist
# Convert sim_window from ticks to timesteps, and velocity from cm/s to m/s
# plan_window_ts = np.ceil((sim_window_tick / ticks_per_second) *
# (velocity / 100) +
# look_ahead_dist / 2).astype(int)
plan_window_ts = np.ceil(
(sim_window_tick / ticks_per_second) * (velocity / 100)).astype(int)
# We need a planning window in all cases because the backup planner might
# be windowed
plan_window_ts = np.max([plan_window_ts, planning_window])
logger.info(f"{planner}: Planning window in timesteps: {plan_window_ts}")
# Path to the executables
if container:
server_path = pathlib.Path(CONTAINER_PROJECT_ROOT) / SERVER_EXE
pbs_path = pathlib.Path(CONTAINER_PROJECT_ROOT) / PBS_EXE
tpbs_path = pathlib.Path(CONTAINER_PROJECT_ROOT) / TPBS_EXE
rhcr_path = pathlib.Path(CONTAINER_PROJECT_ROOT) / RHCR_EXE
mass_path = pathlib.Path(CONTAINER_PROJECT_ROOT) / MASS_EXE
else:
server_path = pathlib.Path(PROJECT_ROOT) / SERVER_EXE
pbs_path = pathlib.Path(PROJECT_ROOT) / PBS_EXE
tpbs_path = pathlib.Path(PROJECT_ROOT) / TPBS_EXE
rhcr_path = pathlib.Path(PROJECT_ROOT) / RHCR_EXE
mass_path = pathlib.Path(PROJECT_ROOT) / MASS_EXE
try:
# print("Running simulator ...")
logger.info(
f"Running simulator with {num_agents} agents at port {port_num} ..."
)
server_command = [
str(server_path),
f"--num_robots={num_agents}",
f"--port_number={port_num}",
f"--output_file={stats_name}",
f"--save_stats={str(save_stats).lower()}",
f"--screen={screen}",
f"--total_sim_step_tick={sim_duration}",
f"--ticks_per_second={ticks_per_second}",
f"--look_ahead_dist={look_ahead_dist}",
f"--look_ahead_tick={look_ahead_tick}",
f"--seed={seed}",
f"--sim_window_tick={sim_window_tick}",
f"--sim_window_timestep={plan_window_ts}",
f"--plan_window_timestep={plan_window_ts}",
f"--planner_invoke_policy={planner_invoke_policy}",
f"--backup_planner={backup_solver}",
f"--backup_single_agent_solver={single_agent_solver}",
f"--map={map_filepath}",
f"--grid_type=regular",
f"--rotation={str(rotation).lower()}",
f"--rotation_time={rotation_time}",
f"--save_heuristics_table={str(save_heuristics_table).lower()}",
f"--task_assigner_type={task_assigner_type}",
]
client_command = [
"argos3",
"-c",
f"{argos_config_filepath}",
# No color displays better in file output
"--no-color",
]
# Standard MAPF PBS
if planner == "PBS":
planner_command = [
pbs_path,
f"--map={map_filepath}",
f"--agentNum={num_agents}",
f"--portNum={port_num}",
f"--seed={seed}",
f"--screen={screen}",
f"--cutoffTime={cutoffTime}",
]
# Transient MAPF PBS
elif planner == "TPBS":
planner_command = [
tpbs_path,
f"--map={map_filepath}",
f"--agentNum={num_agents}",
f"--portNum={port_num}",
f"--seed={seed}",
f"--screen={screen}",
f"--cutoffTime={cutoffTime}",
]
# RHCR, typically windowed PBS
elif planner == "RHCR":
planner_command = [
rhcr_path,
f"--map={map_filepath}",
f"--agentNum={num_agents}",
f"--port_number={port_num}",
f"--seed={seed}",
f"--screen={screen}",
f"--planning_window={plan_window_ts}",
f"--simulation_window={plan_window_ts}",
f"--scenario=SMART",
f"--cutoffTime={cutoffTime}",
f"--rotation={rotation}",
f"--solver={solver}",
f"--single_agent_solver={single_agent_solver}",
f"--backup_solver={backup_solver}",
f"--rotation_time={rotation_time}",
f"--prioritize_start={str(prioritize_start).lower()}",
f"--suboptimal_bound={suboptimal_bound}",
f"--log={str(log).lower()}",
f"--save_result={str(save_result).lower()}",
f"--save_solver={str(save_solver).lower()}",
f"--save_heuristics_table={str(save_heuristics_table).lower()}",
]
# MASS, typically PBS with 2nd order dynamics
elif planner == "MASS":
planner_command = [
mass_path,
f"--map={map_filepath}",
f"--agentNum={num_agents}",
f"--portNum={port_num}",
f"--seed={seed}",
f"--screen={screen}",
f"--simulation_window={sim_window_tick / ticks_per_second}",
f"--cutoffTime={cutoffTime}",
f"--saveInstance={False}",
]
else:
logger.error(f"Unknown planner: {planner}")
exit(-1)
run_simulator(
args=(server_command, client_command, planner_command),
timeout=20 * sim_duration / ticks_per_second,
output_log=output_log,
)
except KeyboardInterrupt:
# print("KeyboardInterrupt: Stopping the experiment ...")
logger.info("KeyboardInterrupt: Stopping the experiment ...")
if __name__ == "__main__":
fire.Fire(run_lifelong_argos)