Source code for cuvslam.tracker

"""
cuVSLAM Tracker - A wrapper combining Odometry and SLAM functionality.
"""

from typing import Optional, List, Callable, Any, Tuple
from .pycuvslam import Odometry, Slam, Rig, PoseEstimate, Pose, ImuMeasurement


[docs] class Tracker: """ A wrapper that combines cuVSLAM Odometry and SLAM functionality. This class automatically manages both the Odometry and SLAM instances, providing a simplified interface for common use cases. """ # Expose inner classes of Odometry & Slam OdometryMode = Odometry.OdometryMode MulticameraMode = Odometry.MulticameraMode OdometryConfig = Odometry.Config OdometryRGBDSettings = Odometry.RGBDSettings SlamConfig = Slam.Config SlamMetrics = Slam.Metrics SlamLocalizationSettings = Slam.LocalizationSettings def __init__(self, rig: Rig, odom_config: Optional[Odometry.Config] = OdometryConfig(), slam_config: Optional[Slam.Config] = None) -> None: """ Initialize the cuVSLAM system. Args: rig: Camera rig configuration odom_config: Optional odometry configuration (uses defaults if omitted) slam_config: Optional SLAM configuration (disables SLAM if None) """ # need to export observations/landmarks for Slam if slam_config is not None: odom_config.enable_observations_export = True odom_config.enable_landmarks_export = True # Create odometry self.odom = Odometry(rig, odom_config) # Create SLAM primary_cams = self.odom.get_primary_cameras() self.slam = Slam(rig, primary_cams, slam_config) if slam_config is not None else None
[docs] def track(self, timestamp: int, images: List[Any], masks: Optional[List[Any]] = None, depths: Optional[List[Any]] = None) -> Tuple[PoseEstimate, Optional[Pose]]: """ Track a rig pose using current image frame. This method combines tracking and SLAM processing in a single call. Args: timestamp: Images timestamp in nanoseconds images: List of numpy arrays containing the camera images masks: Optional list of numpy arrays containing masks for the images depths: Optional list of numpy arrays containing depth images Returns: PoseEstimate: The computed pose estimate from Odometry Pose: The computed pose estimate from SLAM """ pose_estimate = self.odom.track(timestamp, images, masks, depths) slam_pose = None if self.slam: state = self.odom.get_state() slam_pose = self.slam.track(state) return pose_estimate, slam_pose
[docs] def register_imu_measurement(self, sensor_index: int, imu_measurement: ImuMeasurement) -> None: """Register an IMU measurement with the tracker.""" self.odom.register_imu_measurement(sensor_index, imu_measurement)
[docs] def get_last_observations(self, camera_index: int) -> List[Any]: """Get observations from the last frame.""" return self.odom.get_last_observations(camera_index)
[docs] def get_last_landmarks(self) -> List[Any]: """Get landmarks from the last frame.""" return self.odom.get_last_landmarks()
[docs] def get_last_gravity(self) -> Optional[Any]: """Get gravity vector from the last frame.""" return self.odom.get_last_gravity()
[docs] def get_final_landmarks(self) -> dict[int, Any]: """Get all final landmarks from all frames.""" return self.odom.get_final_landmarks()
[docs] def get_all_slam_poses(self, max_poses_count: int = 0) -> Optional[List[Any]]: """Get all SLAM poses for each frame.""" return self.slam.get_all_slam_poses(max_poses_count) if self.slam else None
def set_slam_pose(self, pose: Pose) -> None: """Set the rig SLAM pose to a value provided by user.""" if self.slam: self.slam.set_slam_pose(pose) def save_map(self, folder_name: str, callback: Callable[[bool], None]) -> None: """Save SLAM database (map) to folder asynchronously.""" if self.slam: self.slam.save_map(folder_name, callback) def localize_in_map(self, folder_name: str, guess_pose: Pose, images: List[Any], settings: Any, callback: Callable[[Optional[Pose], str], None]) -> None: """Localize in the existing database (map) asynchronously.""" if self.slam: self.slam.localize_in_map(folder_name, guess_pose, images, settings, callback)
[docs] def get_pose_graph(self) -> Optional[Any]: """Get pose graph.""" return self.slam.get_pose_graph() if self.slam else None
[docs] def get_slam_metrics(self) -> Optional[Any]: """Get SLAM metrics.""" return self.slam.get_slam_metrics() if self.slam else None
[docs] def get_loop_closure_poses(self) -> Optional[List[Any]]: """Get list of last 10 loop closure poses with timestamps.""" return self.slam.get_loop_closure_poses() if self.slam else None
@staticmethod def merge_maps(rig: Rig, databases: List[str], output_folder: str) -> None: """Merge existing maps into one map.""" Slam.merge_maps(rig, databases, output_folder)