Skip to content

API Reference

This section provides comprehensive technical documentation for the render-tag core modules. Documentation is auto-generated from source code using mkdocstrings.


Fundamental data structures and Pydantic models used throughout the pipeline.

render_tag.core.schema.recipe

Rigid schema for render-tag Scene Recipes.

Following the "Move-Left" architecture, this schema defines exactly what a worker needs to render a single frame, with all random decisions resolved.

Classes

CameraIntrinsics

Bases: BaseModel

Camera intrinsic parameters (baked into final K-matrix).

Source code in src/render_tag/core/schema/recipe.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class CameraIntrinsics(BaseModel):
    """Camera intrinsic parameters (baked into final K-matrix)."""

    resolution: list[int] = Field(
        min_length=2, max_length=2, description="[width, height] in pixels"
    )
    k_matrix: list[list[float]] = Field(
        description=(
            "3x3 Intrinsic matrix [[fx, 0, cx], [0, fy, cy], [0, 0, 1]] — "
            "TARGET distorted camera (used by PnP solvers on output images)"
        )
    )
    fov: float | None = Field(default=None, description="Field of view in degrees")

    # Lens distortion model
    distortion_model: Literal["none", "brown_conrady", "kannala_brandt"] = Field(
        default="none",
        description=(
            "Distortion model: 'none' (pinhole), 'brown_conrady' (5-param radial+tangential),"
            " or 'kannala_brandt' (4-param equidistant fisheye)"
        ),
    )
    distortion_coeffs: list[float] = Field(
        default_factory=list,
        description=(
            "Distortion coefficients: [k1,k2,p1,p2,k3] for brown_conrady;"
            " [k1,k2,k3,k4] for kannala_brandt"
        ),
    )

    # Overscan render targets — set by compiler when distortion is active.
    # None means no distortion; use k_matrix / resolution directly for Blender.
    k_matrix_overscan: list[list[float]] | None = Field(
        default=None,
        description="Expanded K-matrix for linear overscan render passed to Blender",
    )
    resolution_overscan: list[int] | None = Field(
        default=None,
        min_length=2,
        max_length=2,
        description="Expanded [width, height] for linear overscan render passed to Blender",
    )

    # Spherical overscan render targets — set by compiler for kannala_brandt.
    # Mutually exclusive with k_matrix_overscan/resolution_overscan (linear path).
    fov_spherical: float | None = Field(
        default=None,
        description="Full FOV in radians for Blender FISHEYE_EQUIDISTANT camera",
    )
    resolution_spherical: list[int] | None = Field(
        default=None,
        min_length=2,
        max_length=2,
        description="[R, R] square resolution for equidistant intermediate render",
    )

    # Evaluation Margin ("Don't Care" Zone)
    eval_margin_px: int = Field(
        default=0,
        ge=0,
        description=(
            "Pixel-width margin along image edges treated as a 'Don't Care' zone. "
            "Keypoints within this margin receive v=1 (labeled but not visible) in "
            "COCO export instead of v=2. "
            "Typical value: 5px (half-radius of a standard 11-px Gaussian kernel)."
        ),
    )

CameraRecipe

Bases: BaseModel

Recipe for a camera pose and configuration.

Source code in src/render_tag/core/schema/recipe.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
class CameraRecipe(BaseModel):
    """Recipe for a camera pose and configuration."""

    transform_matrix: list[list[float]] = Field(
        description="4x4 Camera-to-World transformation matrix"
    )
    intrinsics: CameraIntrinsics
    sensor_dynamics: SensorDynamicsRecipe | None = None
    fstop: float | None = None
    focus_distance: float | None = None
    min_tag_pixels: int | None = Field(
        default=None, description="Minimum visible tag area in pixels"
    )
    max_tag_pixels: int | None = Field(
        default=None, description="Maximum visible tag area in pixels"
    )
    iso_noise: float | None = None
    sensor_noise: SensorNoiseConfig | None = None
    tone_mapping: Literal["linear", "srgb", "filmic"] = Field(
        default="filmic",
        description="Post-render tone-mapping operator applied before sensor noise.",
    )
    dynamic_range_db: float | None = Field(
        default=None,
        description="Sensor dynamic range in dB. None disables DR clipping.",
    )

LightRecipe

Bases: BaseModel

Specific light source configuration.

Source code in src/render_tag/core/schema/recipe.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
class LightRecipe(BaseModel):
    """Specific light source configuration."""

    type: Literal["POINT", "SUN"] = "POINT"
    location: list[float]
    intensity: float
    radius: float = 0.0
    color: list[float] = Field(default_factory=lambda: [1.0, 1.0, 1.0])
    rotation_euler: list[float] | None = Field(
        default=None,
        min_length=3,
        max_length=3,
        description=(
            "XYZ-Euler rotation in radians. Required for SUN lights to control "
            "direction; ignored for POINT."
        ),
    )

ObjectRecipe

Bases: BaseModel

Recipe for a single object in the scene.

Source code in src/render_tag/core/schema/recipe.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
class ObjectRecipe(BaseModel):
    """Recipe for a single object in the scene."""

    type: str = Field(description="Object type: TAG, BOARD, PLANE, etc.")
    name: str = Field(description="Unique name for the object")
    location: list[float] = Field(min_length=3, max_length=3)
    rotation_euler: list[float] | None = Field(default=None, min_length=3, max_length=3)
    rotation_quaternion: list[float] | None = Field(
        default=None, min_length=4, max_length=4, description="[w, x, y, z]"
    )
    scale: list[float] = Field(default=[1.0, 1.0, 1.0])
    properties: dict[str, Any] = Field(default_factory=dict)
    material: dict[str, Any] | None = None
    texture_path: str | None = None
    board: BoardConfig | None = None
    forward_axis: list[float] | None = Field(
        default=None, min_length=4, max_length=4, description="Local forward vector [x, y, z, 0]"
    )
    keypoints_3d: list[list[float]] | None = Field(
        default=None, description="Standardized 3D keypoints [x, y, z] in local object space"
    )
    calibration_points_3d: list[list[float]] | None = Field(
        default=None,
        description="Optional grid of points (e.g., ChArUco saddle points) in local object space",
    )

SceneRecipe

Bases: BaseModel

Complete instruction set for a single generated scene.

Source code in src/render_tag/core/schema/recipe.py
231
232
233
234
235
236
237
238
239
240
241
class SceneRecipe(BaseModel):
    """Complete instruction set for a single generated scene."""

    model_config = ConfigDict(extra="forbid")

    scene_id: int = Field(description="Unique ID for this scene")
    random_seed: int = Field(description="Resolved seed for this scene")
    world: WorldRecipe = Field(default_factory=WorldRecipe)
    renderer: RendererConfig = Field(default_factory=RendererConfig)
    objects: list[ObjectRecipe] = Field(default_factory=list)
    cameras: list[CameraRecipe] = Field(default_factory=list)

SensorDynamicsRecipe

Bases: BaseModel

Recipe for dynamic sensor artifacts (Motion Blur, Rolling Shutter).

Source code in src/render_tag/core/schema/recipe.py
63
64
65
66
67
68
69
70
class SensorDynamicsRecipe(BaseModel):
    """Recipe for dynamic sensor artifacts (Motion Blur, Rolling Shutter)."""

    velocity: list[float] | None = Field(
        default=None, description="[vx, vy, vz] velocity vector in m/s"
    )
    shutter_time_ms: float | None = None
    rolling_shutter_duration_ms: float | None = None

SensorNoiseComponent

Bases: BaseModel

Single noise layer in a stacked sensor-noise pipeline.

Source code in src/render_tag/core/schema/recipe.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class SensorNoiseComponent(BaseModel):
    """Single noise layer in a stacked sensor-noise pipeline."""

    model: str = Field(default="gaussian")
    mean: float = Field(default=0.0)
    stddev: float = Field(default=0.0, ge=0.0)
    salt_vs_pepper: float = Field(default=0.5, ge=0.0, le=1.0)
    amount: float = Field(default=0.0, ge=0.0, le=1.0)
    scale: float = Field(
        default=1000.0,
        gt=0.0,
        description="Poisson scale factor (photon count per unit intensity).",
    )
    seed: int | None = Field(
        default=None,
        description="Per-component seed. If None, derived from the parent config's seed.",
    )

SensorNoiseConfig

Bases: BaseModel

Configuration for parametric sensor noise.

Backward-compatible: flat fields describe a single-model pipeline (legacy shape). models opts into a stacked pipeline where each component is applied in list order — real sensors stack shot + read + quantization noise, and this field lets the schema express that honestly.

If both are present, models wins and the flat fields are ignored.

Source code in src/render_tag/core/schema/recipe.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class SensorNoiseConfig(BaseModel):
    """Configuration for parametric sensor noise.

    Backward-compatible: flat fields describe a single-model pipeline (legacy
    shape). ``models`` opts into a stacked pipeline where each component is
    applied in list order — real sensors stack shot + read + quantization
    noise, and this field lets the schema express that honestly.

    If both are present, ``models`` wins and the flat fields are ignored.
    """

    model: str = Field(default="gaussian")
    mean: float = Field(default=0.0)
    stddev: float = Field(default=0.0, ge=0.0)
    salt_vs_pepper: float = Field(default=0.5, ge=0.0, le=1.0)
    amount: float = Field(default=0.0, ge=0.0, le=1.0)
    scale: float = Field(
        default=1000.0,
        gt=0.0,
        description="Poisson scale factor (photon count per unit intensity).",
    )
    seed: int | None = Field(default=None, description="Deterministic noise seed")
    models: list[SensorNoiseComponent] | None = Field(
        default=None,
        description="Stacked noise layers applied in order. Overrides flat fields when set.",
    )

WorldRecipe

Bases: BaseModel

Resolved world environment configuration.

Source code in src/render_tag/core/schema/recipe.py
216
217
218
219
220
221
222
223
224
225
226
227
228
class WorldRecipe(BaseModel):
    """Resolved world environment configuration."""

    background_hdri: str | None = None
    hdri_rotation: float = 0.0

    # Explicit lights (Move-Left: Host decides positions)
    lights: list[LightRecipe] = Field(default_factory=list)

    # Background Texture Plane
    texture_path: str | None = None
    texture_scale: float = 1.0
    texture_rotation: float = 0.0

render_tag.core.schema.job

Job specification and infrastructure schemas for render-tag.

Defines the JobSpec contract used to communicate between the CLI/Host and the rendering backend.

Classes

JobInfrastructure

Bases: BaseModel

Infrastructure settings for the job.

Source code in src/render_tag/core/schema/job.py
31
32
33
34
35
36
37
38
39
class JobInfrastructure(BaseModel):
    """Infrastructure settings for the job."""

    model_config = ConfigDict(frozen=True)

    max_workers: int = Field(default=1, gt=0)
    timeout_seconds: float = Field(default=3600.0, gt=0)
    worker_memory_limit_gb: float | None = None
    max_memory_mb: int | None = None

JobPaths

Bases: BaseModel

Absolute paths for job execution.

Source code in src/render_tag/core/schema/job.py
21
22
23
24
25
26
27
28
class JobPaths(BaseModel):
    """Absolute paths for job execution."""

    model_config = ConfigDict(frozen=True)

    output_dir: Path
    logs_dir: Path
    assets_dir: Path

JobSpec

Bases: BaseModel

Immutable specification for a rendering job.

A JobSpec defines everything needed to execute a complex rendering task, including paths, infrastructure limits, and the scene configuration.

Attributes:

Name Type Description
version str

The schema version for migration tracking.

job_id str

Unique identifier for the job.

paths JobPaths

Absolute paths to output and asset locations.

infrastructure JobInfrastructure

Resource limits (workers, memory).

global_seed int

Master seed for all deterministic operations.

scene_config GenConfig

The procedural generation parameters.

Source code in src/render_tag/core/schema/job.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
class JobSpec(BaseModel):
    """Immutable specification for a rendering job.

    A JobSpec defines everything needed to execute a complex rendering task,
    including paths, infrastructure limits, and the scene configuration.

    Attributes:
        version: The schema version for migration tracking.
        job_id: Unique identifier for the job.
        paths: Absolute paths to output and asset locations.
        infrastructure: Resource limits (workers, memory).
        global_seed: Master seed for all deterministic operations.
        scene_config: The procedural generation parameters.
    """

    model_config = ConfigDict(frozen=True)

    version: str = Field(default=CURRENT_SCHEMA_VERSION, description="Schema version")
    job_id: str
    created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))

    paths: JobPaths
    infrastructure: JobInfrastructure = Field(default_factory=JobInfrastructure)

    global_seed: int
    scene_config: GenConfig

    # Metadata for reproducibility
    env_hash: str
    blender_version: str
    assets_hash: str = "unknown"  # Placeholder for now
    config_hash: str | None = None
    shard_index: int = 0
    applied_presets: list[str] = Field(
        default_factory=list,
        description=(
            "Preset names resolved by the ACL in composition order. "
            "Contributes to the job_id hash for reproducibility."
        ),
    )

    @property
    def shard_size(self) -> int:
        return self.scene_config.dataset.num_scenes

    def get_total_shards(self, scenes_per_shard: int) -> int:
        """Calculate the total number of shards needed for this job."""
        num_scenes = self.scene_config.dataset.num_scenes
        return (num_scenes + scenes_per_shard - 1) // scenes_per_shard

    def get_scene_indices(self, scenes_per_shard: int) -> list[int]:
        """Get the deterministic list of scene indices for the current shard."""
        start_idx = self.shard_index * scenes_per_shard
        end_idx = min(start_idx + scenes_per_shard, self.scene_config.dataset.num_scenes)

        if start_idx >= self.scene_config.dataset.num_scenes:
            return []

        return list(range(start_idx, end_idx))

    @classmethod
    def from_json(cls, json_str: str) -> "JobSpec":
        """Deserialize and migrate JobSpec from JSON."""
        data = json.loads(json_str)
        return cls._migrate_and_validate(data)

    @classmethod
    def from_file(cls, path: Path | str) -> "JobSpec":
        """Load and migrate JobSpec from a file (in-memory only)."""
        path = Path(path)
        with open(path) as f:
            data = json.load(f)
        return cls._migrate_and_validate(data)

    @classmethod
    def _migrate_and_validate(cls, data: dict) -> "JobSpec":
        """Apply the ACL to both the outer envelope and the nested scene_config."""
        from render_tag.core.schema_adapter import SchemaMigrator, adapt_config

        data = SchemaMigrator().migrate(data)
        if isinstance(data.get("scene_config"), dict):
            data["scene_config"] = adapt_config(data["scene_config"])
        return cls.model_validate(data)
Functions
from_file classmethod
from_file(path: Path | str) -> JobSpec

Load and migrate JobSpec from a file (in-memory only).

Source code in src/render_tag/core/schema/job.py
108
109
110
111
112
113
114
@classmethod
def from_file(cls, path: Path | str) -> "JobSpec":
    """Load and migrate JobSpec from a file (in-memory only)."""
    path = Path(path)
    with open(path) as f:
        data = json.load(f)
    return cls._migrate_and_validate(data)
from_json classmethod
from_json(json_str: str) -> JobSpec

Deserialize and migrate JobSpec from JSON.

Source code in src/render_tag/core/schema/job.py
102
103
104
105
106
@classmethod
def from_json(cls, json_str: str) -> "JobSpec":
    """Deserialize and migrate JobSpec from JSON."""
    data = json.loads(json_str)
    return cls._migrate_and_validate(data)
get_scene_indices
get_scene_indices(scenes_per_shard: int) -> list[int]

Get the deterministic list of scene indices for the current shard.

Source code in src/render_tag/core/schema/job.py
 92
 93
 94
 95
 96
 97
 98
 99
100
def get_scene_indices(self, scenes_per_shard: int) -> list[int]:
    """Get the deterministic list of scene indices for the current shard."""
    start_idx = self.shard_index * scenes_per_shard
    end_idx = min(start_idx + scenes_per_shard, self.scene_config.dataset.num_scenes)

    if start_idx >= self.scene_config.dataset.num_scenes:
        return []

    return list(range(start_idx, end_idx))
get_total_shards
get_total_shards(scenes_per_shard: int) -> int

Calculate the total number of shards needed for this job.

Source code in src/render_tag/core/schema/job.py
87
88
89
90
def get_total_shards(self, scenes_per_shard: int) -> int:
    """Calculate the total number of shards needed for this job."""
    num_scenes = self.scene_config.dataset.num_scenes
    return (num_scenes + scenes_per_shard - 1) // scenes_per_shard

SeedManager

Manages deterministic seed generation hierarchy.

Source code in src/render_tag/core/schema/job.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class SeedManager:
    """Manages deterministic seed generation hierarchy."""

    def __init__(self, master_seed: int):
        self.master_seed = master_seed

    def get_shard_seed(self, shard_index: int) -> int:
        """Get a deterministic seed for a specific shard index.

        Uses SHA256 hashing of (master_seed, shard_index) to produce
        a deterministic seed in O(1) time.
        """
        # Create a unique string from master seed and shard index
        seed_str = f"{self.master_seed}:{shard_index}"

        # Hash it
        hash_hex = hashlib.sha256(seed_str.encode()).hexdigest()

        # Take the first 8 characters (32 bits) and convert to int
        # This ensures we stay within standard 32-bit seed ranges
        return int(hash_hex[:8], 16)
Functions
get_shard_seed
get_shard_seed(shard_index: int) -> int

Get a deterministic seed for a specific shard index.

Uses SHA256 hashing of (master_seed, shard_index) to produce a deterministic seed in O(1) time.

Source code in src/render_tag/core/schema/job.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def get_shard_seed(self, shard_index: int) -> int:
    """Get a deterministic seed for a specific shard index.

    Uses SHA256 hashing of (master_seed, shard_index) to produce
    a deterministic seed in O(1) time.
    """
    # Create a unique string from master seed and shard index
    seed_str = f"{self.master_seed}:{shard_index}"

    # Hash it
    hash_hex = hashlib.sha256(seed_str.encode()).hexdigest()

    # Take the first 8 characters (32 bits) and convert to int
    # This ensures we stay within standard 32-bit seed ranges
    return int(hash_hex[:8], 16)

Functions

calculate_job_id

calculate_job_id(spec: JobSpec) -> str

Calculates a deterministic SHA256 hash for the given JobSpec.

Source code in src/render_tag/core/schema/job.py
150
151
152
153
154
155
156
157
158
159
160
161
def calculate_job_id(spec: JobSpec) -> str:
    """Calculates a deterministic SHA256 hash for the given JobSpec."""
    # Ensure stable JSON serialization
    # Exclude created_at to allow re-running same config with same ID if needed?
    # Or include it to make every run unique?
    # Usually Job ID should be unique per run.
    # But if we want content-addressable, we should exclude timestamp.
    # Let's include everything for now as a UUID-like.
    # Exclude job_id and created_at to allow deterministic generation
    spec_dict = spec.model_dump(exclude={"job_id", "created_at"}, mode="json")
    spec_json = json.dumps(spec_dict, sort_keys=True)
    return hashlib.sha256(spec_json.encode()).hexdigest()

get_env_fingerprint

get_env_fingerprint(
    root_dir: Path | None = None,
) -> tuple[str, str]

Returns a SHA256 hash of the uv.lock file and the BlenderProc version.

Parameters:

Name Type Description Default
root_dir Path | None

The project root directory. Defaults to the current working directory.

None

Returns:

Type Description
tuple[str, str]

A tuple of (env_hash, blender_version).

Source code in src/render_tag/core/schema/job.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def get_env_fingerprint(root_dir: Path | None = None) -> tuple[str, str]:
    """
    Returns a SHA256 hash of the uv.lock file and the BlenderProc version.

    Args:
        root_dir: The project root directory. Defaults to the current working directory.

    Returns:
        A tuple of (env_hash, blender_version).
    """
    if root_dir is None:
        root_dir = Path.cwd()

    uv_lock_path = root_dir / "uv.lock"

    # Try to find uv.lock in parent directories if not in CWD
    if not uv_lock_path.exists():
        curr = root_dir
        while curr.parent != curr:
            if (curr / "uv.lock").exists():
                uv_lock_path = curr / "uv.lock"
                break
            curr = curr.parent

    env_hash = "unknown"
    if uv_lock_path.exists():
        with open(uv_lock_path, "rb") as f:
            env_hash = hashlib.sha256(f.read()).hexdigest()

    blender_version = "unknown"
    if shutil.which("blenderproc"):
        try:
            result = subprocess.run(
                ["blenderproc", "--version"], capture_output=True, text=True, check=False
            )
            if result.returncode == 0:
                # Output format is usually "BlenderProc X.Y.Z"
                blender_version = result.stdout.strip().split()[-1]
        except Exception:
            pass

    return env_hash, blender_version

render_tag.core.schema.subject

Classes

BoardSubjectConfig

Bases: BaseModel

Configuration for a single calibration board.

Attributes:

Name Type Description
type Literal['BOARD']

Discriminator for polymorphic schema.

rows PositiveInt

Number of rows in the grid.

cols PositiveInt

Number of columns in the grid.

marker_size_mm PositiveFloat

Edge length of the markers in millimeters.

dictionary str

Tag family used for markers.

spacing_ratio PositiveFloat | None

Ratio of marker size to spacing (AprilGrid only).

square_size_mm PositiveFloat | None

Total edge length of a grid cell (ChArUco only).

Source code in src/render_tag/core/schema/subject.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class BoardSubjectConfig(BaseModel):
    """Configuration for a single calibration board.

    Attributes:
        type: Discriminator for polymorphic schema.
        rows: Number of rows in the grid.
        cols: Number of columns in the grid.
        marker_size_mm: Edge length of the markers in millimeters.
        dictionary: Tag family used for markers.
        spacing_ratio: Ratio of marker size to spacing (AprilGrid only).
        square_size_mm: Total edge length of a grid cell (ChArUco only).
    """

    type: Literal["BOARD"] = "BOARD"
    rows: PositiveInt
    cols: PositiveInt
    marker_size_mm: PositiveFloat
    dictionary: str = "tag36h11"

    # AprilGrid specific
    spacing_ratio: PositiveFloat | None = None

    # ChArUco specific
    square_size_mm: PositiveFloat | None = None

    # Optional quiet zone (white border) around the grid
    quiet_zone_mm: float = Field(default=0.0, ge=0.0)

    # Optional explicit ID mapping
    ids: list[int] | None = None

    model_config = ConfigDict(use_enum_values=True)

    @field_validator("dictionary")
    @classmethod
    def validate_dictionary(cls, v: str) -> str:
        """Reject board dictionaries this environment cannot render."""
        if v not in SUPPORTED_OPENCV_TAG_FAMILIES:
            raise ValueError(f"Unsupported board dictionary: {v}")
        return v

    @model_validator(mode="before")
    @classmethod
    def migrate_units(cls, data: Any) -> Any:
        """Migrate meters to millimeters."""
        if not isinstance(data, dict):
            return data

        if "marker_size" in data:
            data["marker_size_mm"] = data.pop("marker_size") * 1000.0
        if "square_size" in data:
            data["square_size_mm"] = data.pop("square_size") * 1000.0
        return data

    @model_validator(mode="after")
    def validate_board_constraints(self) -> BoardSubjectConfig:
        """Validate that square_size_mm is greater than marker_size_mm for ChArUco."""
        if self.square_size_mm is not None and self.marker_size_mm >= self.square_size_mm:
            raise ValueError("marker_size_mm must be smaller than square_size_mm")
        return self
Functions
migrate_units classmethod
migrate_units(data: Any) -> Any

Migrate meters to millimeters.

Source code in src/render_tag/core/schema/subject.py
 98
 99
100
101
102
103
104
105
106
107
108
109
@model_validator(mode="before")
@classmethod
def migrate_units(cls, data: Any) -> Any:
    """Migrate meters to millimeters."""
    if not isinstance(data, dict):
        return data

    if "marker_size" in data:
        data["marker_size_mm"] = data.pop("marker_size") * 1000.0
    if "square_size" in data:
        data["square_size_mm"] = data.pop("square_size") * 1000.0
    return data
validate_board_constraints
validate_board_constraints() -> BoardSubjectConfig

Validate that square_size_mm is greater than marker_size_mm for ChArUco.

Source code in src/render_tag/core/schema/subject.py
111
112
113
114
115
116
@model_validator(mode="after")
def validate_board_constraints(self) -> BoardSubjectConfig:
    """Validate that square_size_mm is greater than marker_size_mm for ChArUco."""
    if self.square_size_mm is not None and self.marker_size_mm >= self.square_size_mm:
        raise ValueError("marker_size_mm must be smaller than square_size_mm")
    return self
validate_dictionary classmethod
validate_dictionary(v: str) -> str

Reject board dictionaries this environment cannot render.

Source code in src/render_tag/core/schema/subject.py
90
91
92
93
94
95
96
@field_validator("dictionary")
@classmethod
def validate_dictionary(cls, v: str) -> str:
    """Reject board dictionaries this environment cannot render."""
    if v not in SUPPORTED_OPENCV_TAG_FAMILIES:
        raise ValueError(f"Unsupported board dictionary: {v}")
    return v

SubjectConfig

Bases: RootModel

Root model for polymorphic subjects.

Source code in src/render_tag/core/schema/subject.py
120
121
122
123
class SubjectConfig(RootModel):
    """Root model for polymorphic subjects."""

    root: Annotated[TagSubjectConfig | BoardSubjectConfig, Field(discriminator="type")]

TagSubjectConfig

Bases: BaseModel

Configuration for a collection of flying tags.

Attributes:

Name Type Description
type Literal['TAGS']

Discriminator for polymorphic schema.

tag_families list[str]

List of tag families to sample from.

size_mm PositiveFloat

Edge length of the markers in millimeters.

tags_per_scene int | tuple[int, int]

Number of markers to generate per scene.

Source code in src/render_tag/core/schema/subject.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class TagSubjectConfig(BaseModel):
    """Configuration for a collection of flying tags.

    Attributes:
        type: Discriminator for polymorphic schema.
        tag_families: List of tag families to sample from.
        size_mm: Edge length of the markers in millimeters.
        tags_per_scene: Number of markers to generate per scene.
    """

    type: Literal["TAGS"] = "TAGS"
    tag_families: list[str] = Field(default_factory=lambda: ["tag36h11"])
    size_mm: PositiveFloat = 100.0
    tags_per_scene: int | tuple[int, int] = Field(
        default=10, description="Number of markers to generate per scene (or [min, max] range)."
    )
    tag_spacing_bits: float = Field(default=2.0, description="Spacing between tags in bits")

    model_config = ConfigDict(use_enum_values=True)

    @field_validator("tag_families")
    @classmethod
    def validate_tag_families(cls, v: list[str]) -> list[str]:
        """Reject tag families this environment cannot render."""
        unsupported = [family for family in v if family not in SUPPORTED_OPENCV_TAG_FAMILIES]
        if unsupported:
            raise ValueError(f"Unsupported tag families: {unsupported}")
        return v

    @model_validator(mode="before")
    @classmethod
    def migrate_units(cls, data: Any) -> Any:
        """Migrate size_meters to size_mm."""
        if isinstance(data, dict) and "size_meters" in data:
            data["size_mm"] = data.pop("size_meters") * 1000.0
        return data
Functions
migrate_units classmethod
migrate_units(data: Any) -> Any

Migrate size_meters to size_mm.

Source code in src/render_tag/core/schema/subject.py
48
49
50
51
52
53
54
@model_validator(mode="before")
@classmethod
def migrate_units(cls, data: Any) -> Any:
    """Migrate size_meters to size_mm."""
    if isinstance(data, dict) and "size_meters" in data:
        data["size_mm"] = data.pop("size_meters") * 1000.0
    return data
validate_tag_families classmethod
validate_tag_families(v: list[str]) -> list[str]

Reject tag families this environment cannot render.

Source code in src/render_tag/core/schema/subject.py
39
40
41
42
43
44
45
46
@field_validator("tag_families")
@classmethod
def validate_tag_families(cls, v: list[str]) -> list[str]:
    """Reject tag families this environment cannot render."""
    unsupported = [family for family in v if family not in SUPPORTED_OPENCV_TAG_FAMILIES]
    if unsupported:
        raise ValueError(f"Unsupported tag families: {unsupported}")
    return v

render_tag.core.schema.board

Classes

BoardConfig

Bases: BaseModel

Configuration for a calibration board.

Keypoint Contract (Top-Left, Clockwise): All exported keypoints_3d arrays follow OpenCV 4.6+ standard. For each marker, the four corners are serialized in this exact index order:

    Index 0: Top-Left     (-X, +Y in Blender local / min-X, min-Y in image)
    Index 1: Top-Right    (+X, +Y in Blender local / max-X, min-Y in image)
    Index 2: Bottom-Right (+X, -Y in Blender local / max-X, max-Y in image)
    Index 3: Bottom-Left  (-X, -Y in Blender local / min-X, max-Y in image)

The winding is strictly Clockwise in image-space (Y-down, OpenCV convention),
which corresponds to a positive Shoelace signed area. The pipeline MUST NOT
re-sort or apply convex-hull algorithms to projected corners; index 0 always
maps to Top-Left regardless of camera rotation.

Calibration points (saddle points / AprilGrid intersections) are serialized
left-to-right across each row, top-to-bottom (row 0 first), matching the
iterator order of both the texture synthesizer and the layout generator.
Source code in src/render_tag/core/schema/board.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class BoardConfig(BaseModel):
    """Configuration for a calibration board.

    Keypoint Contract (Top-Left, Clockwise):
        All exported keypoints_3d arrays follow OpenCV 4.6+ standard. For each
        marker, the four corners are serialized in this exact index order:

            Index 0: Top-Left     (-X, +Y in Blender local / min-X, min-Y in image)
            Index 1: Top-Right    (+X, +Y in Blender local / max-X, min-Y in image)
            Index 2: Bottom-Right (+X, -Y in Blender local / max-X, max-Y in image)
            Index 3: Bottom-Left  (-X, -Y in Blender local / min-X, max-Y in image)

        The winding is strictly Clockwise in image-space (Y-down, OpenCV convention),
        which corresponds to a positive Shoelace signed area. The pipeline MUST NOT
        re-sort or apply convex-hull algorithms to projected corners; index 0 always
        maps to Top-Left regardless of camera rotation.

        Calibration points (saddle points / AprilGrid intersections) are serialized
        left-to-right across each row, top-to-bottom (row 0 first), matching the
        iterator order of both the texture synthesizer and the layout generator.
    """

    type: BoardType
    rows: PositiveInt
    cols: PositiveInt

    # Common parameters
    marker_size: PositiveFloat
    dictionary: str = "tag36h11"

    # AprilGrid specific
    spacing_ratio: PositiveFloat | None = None

    # ChArUco specific
    square_size: PositiveFloat | None = None

    # Optional quiet zone (white border) around the grid, in meters
    quiet_zone_m: float = Field(default=0.0, ge=0.0)

    # Optional explicit ID mapping
    ids: list[int] | None = None

    model_config = ConfigDict(use_enum_values=True)

    @model_validator(mode="after")
    def validate_board_constraints(self) -> "BoardConfig":
        """Validate type-specific constraints for calibration boards.

        Returns:
            The validated BoardConfig instance.

        Raises:
            ValueError: If square_size is missing for ChArUco, spacing_ratio is
                missing for AprilGrid, or if marker_size is not smaller than
                square_size for ChArUco.
        """
        if self.type == BoardType.CHARUCO:
            if self.square_size is None:
                raise ValueError("square_size is required for ChArUco")
            if self.marker_size >= self.square_size:
                raise ValueError("marker_size must be smaller than square_size")
        elif self.type == BoardType.APRILGRID:
            if self.spacing_ratio is None:
                raise ValueError("spacing_ratio is required for AprilGrid")
        if self.dictionary not in SUPPORTED_OPENCV_TAG_FAMILIES:
            raise ValueError(f"Unsupported board dictionary: {self.dictionary}")
        return self
Functions
validate_board_constraints
validate_board_constraints() -> BoardConfig

Validate type-specific constraints for calibration boards.

Returns:

Type Description
BoardConfig

The validated BoardConfig instance.

Raises:

Type Description
ValueError

If square_size is missing for ChArUco, spacing_ratio is missing for AprilGrid, or if marker_size is not smaller than square_size for ChArUco.

Source code in src/render_tag/core/schema/board.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@model_validator(mode="after")
def validate_board_constraints(self) -> "BoardConfig":
    """Validate type-specific constraints for calibration boards.

    Returns:
        The validated BoardConfig instance.

    Raises:
        ValueError: If square_size is missing for ChArUco, spacing_ratio is
            missing for AprilGrid, or if marker_size is not smaller than
            square_size for ChArUco.
    """
    if self.type == BoardType.CHARUCO:
        if self.square_size is None:
            raise ValueError("square_size is required for ChArUco")
        if self.marker_size >= self.square_size:
            raise ValueError("marker_size must be smaller than square_size")
    elif self.type == BoardType.APRILGRID:
        if self.spacing_ratio is None:
            raise ValueError("spacing_ratio is required for AprilGrid")
    if self.dictionary not in SUPPORTED_OPENCV_TAG_FAMILIES:
        raise ValueError(f"Unsupported board dictionary: {self.dictionary}")
    return self

BoardDefinition

Bases: BaseModel

Output descriptor shipped in BOARD DetectionRecords.

Unlike BoardConfig (input configuration), this captures the resolved physical parameters needed to instantiate cv2.aruco.CharucoBoard or a Kalibr AprilGrid downstream without external config.

Source code in src/render_tag/core/schema/board.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
class BoardDefinition(BaseModel):
    """Output descriptor shipped in BOARD DetectionRecords.

    Unlike BoardConfig (input configuration), this captures the resolved
    physical parameters needed to instantiate ``cv2.aruco.CharucoBoard`` or
    a Kalibr AprilGrid downstream without external config.
    """

    type: BoardType
    rows: PositiveInt
    cols: PositiveInt
    square_size_mm: float = Field(gt=0)
    marker_size_mm: float = Field(gt=0)
    dictionary: str
    total_keypoints: int = Field(ge=0)
    spacing_ratio: float | None = Field(default=None)

    model_config = ConfigDict(use_enum_values=True, frozen=True)

    @model_validator(mode="after")
    def validate_aprilgrid_spacing(self) -> "BoardDefinition":
        """Validate that AprilGrid definitions include spacing_ratio."""
        if self.type == BoardType.APRILGRID and self.spacing_ratio is None:
            raise ValueError("spacing_ratio required for AprilGrid board definitions")
        if self.dictionary not in SUPPORTED_OPENCV_TAG_FAMILIES:
            raise ValueError(f"Unsupported board dictionary: {self.dictionary}")
        return self
Functions
validate_aprilgrid_spacing
validate_aprilgrid_spacing() -> BoardDefinition

Validate that AprilGrid definitions include spacing_ratio.

Source code in src/render_tag/core/schema/board.py
101
102
103
104
105
106
107
108
@model_validator(mode="after")
def validate_aprilgrid_spacing(self) -> "BoardDefinition":
    """Validate that AprilGrid definitions include spacing_ratio."""
    if self.type == BoardType.APRILGRID and self.spacing_ratio is None:
        raise ValueError("spacing_ratio required for AprilGrid board definitions")
    if self.dictionary not in SUPPORTED_OPENCV_TAG_FAMILIES:
        raise ValueError(f"Unsupported board dictionary: {self.dictionary}")
    return self

Procedural mathematics and scene construction logic.

render_tag.generation.compiler

Deterministic Scene Compiler for render-tag.

Shifts all "decision-making" (random sampling, asset selection, pose calculation) from the Blender runtime to the pure-Python preparation phase.

Classes

SceneCompiler

Compiles a high-level JobSpec/GenConfig into a list of rigid SceneRecipes.

This class owns all randomness and ensures that the resulting recipes are purely execution-ready instructions for a "dumb" worker.

Attributes:

Name Type Description
config

The generation configuration.

global_seed

Master seed for deterministic derivations.

output_dir

Path to storage for recipes and textures.

asset_provider

Resolver for textures and HDRI environments.

strategy SubjectStrategy

Current subject rendering strategy (e.g., TagStrategy).

Source code in src/render_tag/generation/compiler.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
class SceneCompiler:
    """Compiles a high-level JobSpec/GenConfig into a list of rigid SceneRecipes.

    This class owns all randomness and ensures that the resulting recipes
    are purely execution-ready instructions for a "dumb" worker.

    Attributes:
        config: The generation configuration.
        global_seed: Master seed for deterministic derivations.
        output_dir: Path to storage for recipes and textures.
        asset_provider: Resolver for textures and HDRI environments.
        strategy: Current subject rendering strategy (e.g., TagStrategy).
    """

    def __init__(
        self,
        config: GenConfig,
        global_seed: int = 42,
        output_dir: Path | None = None,
        asset_provider: AssetProvider | None = None,
    ):
        self.config = config
        self.global_seed = global_seed
        self.output_dir = output_dir
        if self.output_dir is not None:
            self.output_dir.mkdir(parents=True, exist_ok=True)
        self.asset_provider = asset_provider or AssetProvider()

        # Initialize Subject Strategy
        self.strategy: SubjectStrategy = get_subject_strategy(self.config.scenario.subject)

        # If it's a TagStrategy, we might need to synchronize its config with GenConfig
        # (Though ideally SubjectConfig should already be correct)
        from .strategy.tags import TagStrategy

        if isinstance(self.strategy, TagStrategy):
            # Update strategy config to match GenConfig if needed
            # For now, we assume SubjectConfig is the source of truth for the strategy
            pass

        # Prepare assets for the subject once per compiler instance
        from render_tag.generation.context import GenerationContext

        ctx = GenerationContext(
            gen_config=self.config, output_dir=self.output_dir or Path("output")
        )
        self.strategy.prepare_assets(ctx)

        # Cache textures
        self.textures = []
        if self.config.scene.texture_dir and self.config.scene.texture_dir.exists():
            valid_exts = {".png", ".jpg", ".jpeg", ".tif", ".tiff"}
            self.textures = [
                p
                for p in self.config.scene.texture_dir.rglob("*")
                if p.suffix.lower() in valid_exts
            ]

    def compile_shards(
        self,
        shard_index: int,
        total_shards: int,
        exclude_ids: set[int] | None = None,
        *,
        total_scenes: int | None = None,
        validate: bool = False,
    ) -> list[SceneRecipe]:
        """Compile a specific shard of scenes.

        Args:
            shard_index: The zero-based index of the current shard.
            total_shards: The total number of shards the job is split into.
            exclude_ids: Optional set of scene IDs to skip.
            total_scenes: Total scenes across all shards. Defaults to
                ``config.dataset.num_scenes``; callers that shard a job spec
                with a different scope can override it.
            validate: If True, each scene is built under the retry-on-invalid
                loop (see ``compile_scene``). Defaults to False to preserve
                existing direct-compile behavior.

        Returns:
            A list of compiled SceneRecipe objects for this shard.
        """
        exclude_ids = exclude_ids or set()
        if total_scenes is None:
            total_scenes = self.config.dataset.num_scenes

        if total_shards > total_scenes:
            total_shards = total_scenes
            if shard_index >= total_shards:
                return []

        scenes_per_shard = total_scenes // total_shards
        start_idx = shard_index * scenes_per_shard
        end_idx = total_scenes if shard_index == total_shards - 1 else start_idx + scenes_per_shard

        recipes = []
        for i in range(start_idx, end_idx):
            if i in exclude_ids:
                continue
            recipes.append(self.compile_scene(i, validate=validate))
        return recipes

    def compile_scene(self, scene_id: int, *, validate: bool = False) -> SceneRecipe:
        """Compile a single scene recipe with full determinism.

        Args:
            scene_id: The unique identifier for the scene.
            validate: When True, run ``RecipeValidator`` on each attempt and
                re-sample (with a new ``derive_seed(..., "attempt", n)`` seed)
                until the recipe is free of errors and non-cache warnings.
                Raises ``RuntimeError`` if 50 attempts all fail. Defaults to
                False, which returns the first build deterministically.

        Returns:
            A fully resolved SceneRecipe with all randomness removed.
        """
        scene_seed = derive_seed(self.global_seed, "scene", scene_id)

        if not validate:
            return self._build_recipe(scene_id, scene_seed)

        from ..core.validator import CACHE_PENDING_WARNING_PREFIX, RecipeValidator

        scene_logger = logger.bind(scene_id=scene_id, seed=scene_seed)

        for attempt in range(MAX_VALIDATION_RETRIES):
            attempt_seed = derive_seed(scene_seed, "attempt", attempt)
            recipe = self._build_recipe(scene_id, attempt_seed)

            validator = RecipeValidator(recipe)
            validator.validate()

            # Cache-pending warnings are expected: the TagStrategy references
            # PNGs that prep_stage._pregenerate_tags writes immediately after
            # compilation. They are not a reason to re-sample.
            relevant_warnings = [
                w for w in validator.warnings if CACHE_PENDING_WARNING_PREFIX not in w
            ]

            if not validator.errors and not relevant_warnings:
                return recipe

            scene_logger.debug(
                f"Scene {scene_id} attempt {attempt} failed validation "
                f"(Errors: {len(validator.errors)}, "
                f"Warnings: {len(validator.warnings)}). Re-sampling...",
                attempt=attempt,
            )

        errors_preview = "; ".join(validator.errors[:3]) or "(no errors)"
        warnings_preview = "; ".join(relevant_warnings[:3]) or "(no warnings)"
        raise RuntimeError(
            f"Could not generate a valid scene for ID {scene_id} after "
            f"{MAX_VALIDATION_RETRIES} attempts. "
            f"Last errors: {errors_preview}. Last warnings: {warnings_preview}."
        )

    def save_recipe_json(
        self, recipes: list[SceneRecipe], filename: str = "scene_recipes.json"
    ) -> Path:
        """Serialize a list of recipes to ``{output_dir}/{filename}``."""
        if self.output_dir is None:
            raise ValueError("SceneCompiler.save_recipe_json requires output_dir to be set.")
        path = self.output_dir / filename
        data = [r.model_dump(mode="json") for r in recipes]
        with open(path, "w") as f:
            json.dump(data, f, indent=2)
        return path

    def _build_recipe(self, scene_id: int, seed: int) -> SceneRecipe:
        """Internal build logic that resolves all ranges into absolute values."""
        recipe = SceneRecipe(
            scene_id=scene_id,
            random_seed=seed,
            renderer=self.config.renderer,
        )

        world_seed = derive_seed(seed, "world", 0)
        recipe.world = self._build_world_recipe(scene_id, world_seed)

        from render_tag.generation.context import GenerationContext

        ctx = GenerationContext(
            gen_config=self.config, output_dir=self.output_dir or Path("output")
        )
        objects = self.strategy.sample_pose(seed, ctx)
        recipe.objects = objects

        recipe.cameras = self._sample_camera_recipes(scene_id, seed, objects)

        return recipe

    def _build_world_recipe(self, scene_id: int, seed: int) -> WorldRecipe:
        """Build the world/environment part of the recipe."""
        rng = np.random.default_rng(seed)
        scene_config = self.config.scene
        lighting_config = scene_config.lighting

        texture_path = None
        texture_scale = 1.0
        texture_rotation = 0.0

        if self.textures:
            pool = list(self.textures)
            rng.shuffle(pool)
            raw_path = str(pool[scene_id % len(pool)])
            texture_path = str(self.asset_provider.resolve_path(raw_path).absolute())

            min_s = scene_config.texture_scale_min
            max_s = scene_config.texture_scale_max
            if max_s / min_s > 10.0:
                log_min = math.log(min_s)
                log_max = math.log(max_s)
                texture_scale = math.exp(rng.uniform(log_min, log_max))
            else:
                texture_scale = rng.uniform(min_s, max_s)

            if scene_config.random_texture_rotation:
                texture_rotation = rng.uniform(0, 2 * np.pi)

        background_hdri = None
        if scene_config.background_hdri:
            background_hdri = str(
                self.asset_provider.resolve_path(str(scene_config.background_hdri)).absolute()
            )

        num_lights = 3
        lights = []
        for l_idx in range(num_lights):
            l_seed = derive_seed(seed, "light", l_idx)
            l_rng = np.random.default_rng(l_seed)

            theta = l_rng.uniform(0, 2 * math.pi)
            phi = l_rng.uniform(0.2, 0.8) * math.pi / 2
            radius = l_rng.uniform(2, 5)

            x = radius * math.sin(phi) * math.cos(theta)
            y = radius * math.sin(phi) * math.sin(theta)
            z = radius * math.cos(phi)

            intensity = (
                l_rng.uniform(lighting_config.intensity_min, lighting_config.intensity_max)
                / num_lights
            )

            lights.append(
                LightRecipe(
                    location=[x, y, z],
                    intensity=intensity,
                    radius=l_rng.uniform(lighting_config.radius_min, lighting_config.radius_max),
                    color=[1.0, 1.0, 1.0],
                )
            )

        for cfg in lighting_config.directional:
            lights.append(_build_sun_light_recipe(cfg))

        return WorldRecipe(
            background_hdri=background_hdri,
            lights=lights,
            texture_path=texture_path,
            texture_scale=texture_scale,
            texture_rotation=texture_rotation,
        )

    def _calculate_ppm_distance(self, target_tag, np_rng) -> float | None:
        """Calculate override distance for a target PPM."""
        from ..core.geometry.projection_math import solve_distance_for_ppm

        camera_config = self.config.camera
        if not camera_config.ppm_constraint:
            return None

        f_px = camera_config.resolution[0] / (2.0 * np.tan(np.radians(camera_config.fov) / 2.0))
        target_ppm = np_rng.uniform(
            camera_config.ppm_constraint.min, camera_config.ppm_constraint.max
        )

        # Use active marker size (black border) for PPM calculation
        tag_size_m = target_tag.properties.get("tag_size", 0.1)
        if target_tag.type == "TAG":
            from render_tag.core.constants import TAG_GRID_SIZES

            family = target_tag.properties.get("tag_family", "tag36h11")
            margin_bits = target_tag.properties.get("margin_bits", 0)
            grid_size = TAG_GRID_SIZES.get(family, 8)
            total_bits = grid_size + (2 * margin_bits)
            tag_size_m = tag_size_m * (grid_size / total_bits)
        elif target_tag.type == "BOARD" and target_tag.board:
            tag_size_m = target_tag.board.marker_size

        from render_tag.core.constants import TAG_GRID_SIZES

        return solve_distance_for_ppm(
            target_ppm=target_ppm,
            tag_size_m=tag_size_m,
            focal_length_px=f_px,
            tag_grid_size=TAG_GRID_SIZES.get(
                target_tag.properties.get("tag_family", "tag36h11"), 8
            ),
        )

    def _sample_single_pose(self, np_rng, dist_override, elev_override, target_tag):
        """Sample a single valid camera pose using rejection sampling."""
        camera_config = self.config.camera
        scenario = self.config.scenario

        # Staff Engineer: Use target tag location as look-at point in random mode to
        # ensure visibility.
        # In sweep modes, we look at the origin [0,0,0] to maintain the geometric contract
        # relative to the center of the world.
        if scenario.sampling_mode == "random" and target_tag:
            look_at = np.array(target_tag.location)
        else:
            look_at = np.array([0.0, 0.0, 0.0])

        for _ in range(50):  # Increased retries for better coverage of edge cases
            # 1. Determine camera location parameters
            dist = (
                dist_override
                if dist_override is not None
                else np_rng.uniform(camera_config.min_distance, camera_config.max_distance)
            )
            elev = (
                elev_override
                if elev_override is not None
                else (
                    camera_config.elevation
                    if camera_config.elevation is not None
                    else np_rng.uniform(camera_config.min_elevation, camera_config.max_elevation)
                )
            )
            azim = (
                camera_config.azimuth
                if camera_config.azimuth is not None
                else np_rng.uniform(0, 2 * np.pi)
            )

            # 2. Sample target position in image frame if in random mode
            target_image_pos = None
            if scenario.sampling_mode == "random" and target_tag:
                # Estimate tag angular size to define a safe sampling margin.
                # Max extent from center is diagonal: size * sqrt(2) / 2 approx 0.707 * size.
                # We use a 1.0x factor to be very safe against roll and perspective distortion.
                tag_size = target_tag.properties.get("tag_size", 0.1)
                if target_tag.type == "BOARD" and target_tag.board:
                    tag_size = max(
                        target_tag.board.cols * target_tag.board.marker_size,
                        target_tag.board.rows * target_tag.board.marker_size,
                    )

                f_px = camera_config.resolution[0] / (
                    2.0 * np.tan(np.radians(camera_config.fov) / 2.0)
                )
                pixel_margin = (f_px * tag_size) / dist

                w, h = camera_config.resolution
                if pixel_margin * 2 < min(w, h):
                    u = np_rng.uniform(pixel_margin, w - pixel_margin)
                    v = np_rng.uniform(pixel_margin, h - pixel_margin)
                    target_image_pos = np.array([u, v])

            # 3. Sample roll
            roll = (
                np_rng.uniform(
                    np.radians(camera_config.min_roll), np.radians(camera_config.max_roll)
                )
                if abs(camera_config.max_roll - camera_config.min_roll) > 1e-6
                else 0.0
            )

            # 4. Generate candidate pose
            pose = sample_camera_pose(
                look_at_point=look_at,
                distance=dist,
                elevation=elev,
                azimuth=azim,
                inplane_rot=roll,
                target_image_pos=target_image_pos,
                k_matrix=camera_config.get_k_matrix(),
                rng=np_rng,
            )

            # 5. Validate all constraints
            if self._validate_pose_constraints(pose, target_tag):
                return pose

        return None

    def _sample_camera_recipes(self, scene_id: int, seed: int, objects: list) -> list[CameraRecipe]:
        """Sample multiple camera poses and create recipes."""
        camera_seed = derive_seed(seed, "camera", 0)
        np_rng = np.random.default_rng(camera_seed)
        camera_config = self.config.camera
        scenario = self.config.scenario

        # Find potential targets for orientation/sizing constraints
        # Prefer actual TAGs, fallback to any object
        all_tags = [obj for obj in objects if obj.type == "TAG"]

        camera_recipes = []

        effective_iso_noise, effective_sensor_noise = derive_iso_coupled_noise(camera_config)
        base_noise_dict = (
            effective_sensor_noise.model_dump() if effective_sensor_noise is not None else None
        )

        for cam_idx in range(camera_config.samples_per_scene):
            # Select target tag for this specific camera sample to maximize diversity
            target_tag = None
            if all_tags:
                target_tag = np_rng.choice(all_tags)
            elif objects:
                target_tag = objects[0]

            dist_override = None
            elev_override = None

            if self.config.dataset.num_scenes > 1:
                t = scene_id / (self.config.dataset.num_scenes - 1)
                if scenario.sampling_mode == "distance":
                    dist_override = camera_config.min_distance + t * (
                        camera_config.max_distance - camera_config.min_distance
                    )
                elif scenario.sampling_mode == "angle":
                    elev_override = camera_config.min_elevation + t * (
                        camera_config.max_elevation - camera_config.min_elevation
                    )

            # PPM constraint only applies if not already in a sweep mode
            if scenario.sampling_mode == "random" and camera_config.ppm_constraint and target_tag:
                dist_override = self._calculate_ppm_distance(target_tag, np_rng)
                # Ensure PPM distance respects configured bounds
                if dist_override is not None:
                    dist_override = np.clip(
                        dist_override, camera_config.min_distance, camera_config.max_distance
                    )

            pose = self._sample_single_pose(np_rng, dist_override, elev_override, target_tag)

            if not pose:
                # Proper fix: Raise error if we cannot find a valid iose after rejection sampling.
                # This ensures we don't generate invalid/incomplete recipes.
                raise ValueError(
                    f"Failed to sample a valid camera pose for scene {scene_id} "
                    f"after 20 attempts with constraints."
                )

            velocity = None
            if camera_config.velocity_mean > 0 or camera_config.velocity_std > 0:
                direction = np_rng.normal(size=3)
                norm = np.linalg.norm(direction)
                direction = direction / norm if norm > 1e-6 else np.array([0.0, 0.0, 1.0])
                magnitude = max(
                    0.0, np_rng.normal(camera_config.velocity_mean, camera_config.velocity_std)
                )
                velocity = (direction * magnitude).tolist()

            if base_noise_dict is not None:
                noise_recipe = {
                    **base_noise_dict,
                    "seed": derive_seed(camera_seed, "noise", cam_idx),
                }
            else:
                noise_recipe = None

            k_matrix = camera_config.get_k_matrix()
            dist_model = camera_config.intrinsics.distortion_model
            dist_coeffs = list(camera_config.intrinsics.get_distortion_coeffs())
            has_dist = has_active_distortion(dist_coeffs)

            k_overscan: list[list[float]] | None = None
            res_overscan: list[int] | None = None
            fov_spherical: float | None = None
            res_spherical: list[int] | None = None
            if has_dist:
                if dist_model == "kannala_brandt":
                    fov_sph, (r_w, r_h) = compute_spherical_overscan_params(
                        k_matrix, camera_config.resolution, dist_coeffs
                    )
                    fov_spherical = fov_sph
                    res_spherical = [r_w, r_h]
                else:
                    k_overscan, (w_lin, h_lin) = compute_overscan_intrinsics(
                        k_matrix, camera_config.resolution, dist_coeffs, distortion_model=dist_model
                    )
                    res_overscan = [w_lin, h_lin]

            camera_recipes.append(
                CameraRecipe(
                    transform_matrix=pose.transform_matrix.tolist(),
                    intrinsics=CameraIntrinsics(
                        resolution=list(camera_config.resolution),
                        k_matrix=k_matrix,
                        fov=camera_config.fov,
                        distortion_model=dist_model if has_dist else "none",
                        distortion_coeffs=dist_coeffs if has_dist else [],
                        k_matrix_overscan=k_overscan,
                        resolution_overscan=res_overscan,
                        fov_spherical=fov_spherical,
                        resolution_spherical=res_spherical,
                        eval_margin_px=camera_config.eval_margin_px,
                    ),
                    sensor_dynamics=SensorDynamicsRecipe(
                        velocity=velocity,
                        shutter_time_ms=camera_config.sensor_dynamics.shutter_time_ms,
                        rolling_shutter_duration_ms=camera_config.sensor_dynamics.rolling_shutter_duration_ms,
                    ),
                    fstop=camera_config.fstop,
                    focus_distance=camera_config.focus_distance,
                    min_tag_pixels=camera_config.min_tag_pixels,
                    max_tag_pixels=camera_config.max_tag_pixels,
                    iso_noise=effective_iso_noise,
                    sensor_noise=noise_recipe,
                    tone_mapping=camera_config.tone_mapping,
                    dynamic_range_db=camera_config.dynamic_range_db,
                )
            )
        return camera_recipes

    def _validate_pose_constraints(self, pose, target_tag) -> bool:
        """Validate orientation and sizing constraints for a sampled pose."""
        if not target_tag:
            return True

        from ..core.config import get_min_pixel_area
        from ..core.geometry.projection_math import (
            calculate_pixel_area,
            get_world_matrix,
            project_points,
        )

        camera_config = self.config.camera

        # 1. Orientation check
        tag_world_mat = get_world_matrix(
            target_tag.location, target_tag.rotation_euler, target_tag.scale
        )
        tag_normal = tag_world_mat[:3, 2]  # Z-up normal
        if not is_facing_camera(
            tag_location=np.array(target_tag.location),
            tag_normal=tag_normal,
            camera_location=pose.location,
            min_dot=0.1,  # Relaxed to ~84 degrees to support low-elevation tests/views
        ):
            return False

        # 2. Mandatory Frame Visibility check
        # All 4 corners of the tag (or target object) must be within the frame
        if target_tag.type == "BOARD" and target_tag.board:
            # Rigid Calibration Board (Strategy uses scale in recipe)
            hw, hh = 0.5, 0.5
        elif target_tag.type == "BOARD":
            # Background Board (TagStrategy uses unit scale in recipe)
            cols = target_tag.properties.get("cols", 1)
            rows = target_tag.properties.get("rows", 1)
            sq = target_tag.properties.get("square_size", 0.1)
            hw = (cols * sq) / 2.0
            hh = (rows * sq) / 2.0
        else:
            # Individual Tag
            size = target_tag.properties.get("tag_size", 0.1)
            hw = hh = size / 2.0

        corners_local = np.array([[-hw, -hh, 0], [hw, -hh, 0], [hw, hh, 0], [-hw, hh, 0]])
        corners_world = (tag_world_mat @ np.hstack([corners_local, np.ones((4, 1))]).T).T[:, :3]
        dist_coeffs = list(camera_config.intrinsics.get_distortion_coeffs())
        pixels = project_points(
            corners_world,
            pose.transform_matrix,
            list(camera_config.resolution),
            camera_config.get_k_matrix(),
            distortion_coeffs=dist_coeffs or None,
            distortion_model=camera_config.intrinsics.distortion_model,
        )

        # Strict boundary check (all corners must be visible)
        if not all(
            0 <= px <= camera_config.resolution[0] and 0 <= py <= camera_config.resolution[1]
            for px, py in pixels
        ):
            return False

        # 3. Optional Sizing constraints validation (Pixel Area)
        if camera_config.min_tag_pixels or camera_config.max_tag_pixels:
            family = target_tag.properties.get("tag_family", "tag36h11")
            min_allowed = camera_config.min_tag_pixels or get_min_pixel_area(family)
            max_allowed = camera_config.max_tag_pixels or (
                camera_config.resolution[0] * camera_config.resolution[1]
            )

            if not (min_allowed <= calculate_pixel_area(pixels) <= max_allowed):
                return False

        return True
Functions
compile_scene
compile_scene(
    scene_id: int, *, validate: bool = False
) -> SceneRecipe

Compile a single scene recipe with full determinism.

Parameters:

Name Type Description Default
scene_id int

The unique identifier for the scene.

required
validate bool

When True, run RecipeValidator on each attempt and re-sample (with a new derive_seed(..., "attempt", n) seed) until the recipe is free of errors and non-cache warnings. Raises RuntimeError if 50 attempts all fail. Defaults to False, which returns the first build deterministically.

False

Returns:

Type Description
SceneRecipe

A fully resolved SceneRecipe with all randomness removed.

Source code in src/render_tag/generation/compiler.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
def compile_scene(self, scene_id: int, *, validate: bool = False) -> SceneRecipe:
    """Compile a single scene recipe with full determinism.

    Args:
        scene_id: The unique identifier for the scene.
        validate: When True, run ``RecipeValidator`` on each attempt and
            re-sample (with a new ``derive_seed(..., "attempt", n)`` seed)
            until the recipe is free of errors and non-cache warnings.
            Raises ``RuntimeError`` if 50 attempts all fail. Defaults to
            False, which returns the first build deterministically.

    Returns:
        A fully resolved SceneRecipe with all randomness removed.
    """
    scene_seed = derive_seed(self.global_seed, "scene", scene_id)

    if not validate:
        return self._build_recipe(scene_id, scene_seed)

    from ..core.validator import CACHE_PENDING_WARNING_PREFIX, RecipeValidator

    scene_logger = logger.bind(scene_id=scene_id, seed=scene_seed)

    for attempt in range(MAX_VALIDATION_RETRIES):
        attempt_seed = derive_seed(scene_seed, "attempt", attempt)
        recipe = self._build_recipe(scene_id, attempt_seed)

        validator = RecipeValidator(recipe)
        validator.validate()

        # Cache-pending warnings are expected: the TagStrategy references
        # PNGs that prep_stage._pregenerate_tags writes immediately after
        # compilation. They are not a reason to re-sample.
        relevant_warnings = [
            w for w in validator.warnings if CACHE_PENDING_WARNING_PREFIX not in w
        ]

        if not validator.errors and not relevant_warnings:
            return recipe

        scene_logger.debug(
            f"Scene {scene_id} attempt {attempt} failed validation "
            f"(Errors: {len(validator.errors)}, "
            f"Warnings: {len(validator.warnings)}). Re-sampling...",
            attempt=attempt,
        )

    errors_preview = "; ".join(validator.errors[:3]) or "(no errors)"
    warnings_preview = "; ".join(relevant_warnings[:3]) or "(no warnings)"
    raise RuntimeError(
        f"Could not generate a valid scene for ID {scene_id} after "
        f"{MAX_VALIDATION_RETRIES} attempts. "
        f"Last errors: {errors_preview}. Last warnings: {warnings_preview}."
    )
compile_shards
compile_shards(
    shard_index: int,
    total_shards: int,
    exclude_ids: set[int] | None = None,
    *,
    total_scenes: int | None = None,
    validate: bool = False,
) -> list[SceneRecipe]

Compile a specific shard of scenes.

Parameters:

Name Type Description Default
shard_index int

The zero-based index of the current shard.

required
total_shards int

The total number of shards the job is split into.

required
exclude_ids set[int] | None

Optional set of scene IDs to skip.

None
total_scenes int | None

Total scenes across all shards. Defaults to config.dataset.num_scenes; callers that shard a job spec with a different scope can override it.

None
validate bool

If True, each scene is built under the retry-on-invalid loop (see compile_scene). Defaults to False to preserve existing direct-compile behavior.

False

Returns:

Type Description
list[SceneRecipe]

A list of compiled SceneRecipe objects for this shard.

Source code in src/render_tag/generation/compiler.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def compile_shards(
    self,
    shard_index: int,
    total_shards: int,
    exclude_ids: set[int] | None = None,
    *,
    total_scenes: int | None = None,
    validate: bool = False,
) -> list[SceneRecipe]:
    """Compile a specific shard of scenes.

    Args:
        shard_index: The zero-based index of the current shard.
        total_shards: The total number of shards the job is split into.
        exclude_ids: Optional set of scene IDs to skip.
        total_scenes: Total scenes across all shards. Defaults to
            ``config.dataset.num_scenes``; callers that shard a job spec
            with a different scope can override it.
        validate: If True, each scene is built under the retry-on-invalid
            loop (see ``compile_scene``). Defaults to False to preserve
            existing direct-compile behavior.

    Returns:
        A list of compiled SceneRecipe objects for this shard.
    """
    exclude_ids = exclude_ids or set()
    if total_scenes is None:
        total_scenes = self.config.dataset.num_scenes

    if total_shards > total_scenes:
        total_shards = total_scenes
        if shard_index >= total_shards:
            return []

    scenes_per_shard = total_scenes // total_shards
    start_idx = shard_index * scenes_per_shard
    end_idx = total_scenes if shard_index == total_shards - 1 else start_idx + scenes_per_shard

    recipes = []
    for i in range(start_idx, end_idx):
        if i in exclude_ids:
            continue
        recipes.append(self.compile_scene(i, validate=validate))
    return recipes
save_recipe_json
save_recipe_json(
    recipes: list[SceneRecipe],
    filename: str = "scene_recipes.json",
) -> Path

Serialize a list of recipes to {output_dir}/{filename}.

Source code in src/render_tag/generation/compiler.py
381
382
383
384
385
386
387
388
389
390
391
def save_recipe_json(
    self, recipes: list[SceneRecipe], filename: str = "scene_recipes.json"
) -> Path:
    """Serialize a list of recipes to ``{output_dir}/{filename}``."""
    if self.output_dir is None:
        raise ValueError("SceneCompiler.save_recipe_json requires output_dir to be set.")
    path = self.output_dir / filename
    data = [r.model_dump(mode="json") for r in recipes]
    with open(path, "w") as f:
        json.dump(data, f, indent=2)
    return path

Functions

compute_overscan_intrinsics

compute_overscan_intrinsics(
    k_target: list[list[float]],
    resolution: tuple[int, int],
    distortion_coeffs: list[float],
    distortion_model: str = "brown_conrady",
    n_samples: int = 32,
) -> tuple[list[list[float]], tuple[int, int]]

Compute the linear overscan K-matrix and resolution needed to cover all rays sampled by the distorted target image.

Samples the 4 edges of the target image at n_samples points each and applies iterative inverse distortion to find the maximum undistorted angular extent. The returned overscan K and resolution guarantee that Blender's linear render fully covers the field needed for the post-warp.

Parameters:

Name Type Description Default
k_target list[list[float]]

3x3 target K-matrix [[fx,0,cx],[0,fy,cy],[0,0,1]].

required
resolution tuple[int, int]

(width, height) of the target distorted image.

required
distortion_coeffs list[float]

Distortion coefficients for the model.

required
distortion_model str

'brown_conrady' or 'kannala_brandt'.

'brown_conrady'
n_samples int

Number of sample points per edge.

32

Returns:

Type Description
(k_linear, (W_lin, H_lin))

Overscan K-matrix and pixel dimensions.

Source code in src/render_tag/generation/compiler.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def compute_overscan_intrinsics(
    k_target: list[list[float]],
    resolution: tuple[int, int],
    distortion_coeffs: list[float],
    distortion_model: str = "brown_conrady",
    n_samples: int = 32,
) -> tuple[list[list[float]], tuple[int, int]]:
    """
    Compute the linear overscan K-matrix and resolution needed to cover all
    rays sampled by the distorted target image.

    Samples the 4 edges of the target image at n_samples points each and
    applies iterative inverse distortion to find the maximum undistorted
    angular extent. The returned overscan K and resolution guarantee that
    Blender's linear render fully covers the field needed for the post-warp.

    Args:
        k_target: 3x3 target K-matrix [[fx,0,cx],[0,fy,cy],[0,0,1]].
        resolution: (width, height) of the target distorted image.
        distortion_coeffs: Distortion coefficients for the model.
        distortion_model: 'brown_conrady' or 'kannala_brandt'.
        n_samples: Number of sample points per edge.

    Returns:
        (k_linear, (W_lin, H_lin)): Overscan K-matrix and pixel dimensions.
    """
    W, H = resolution
    fx = k_target[0][0]
    fy = k_target[1][1]

    # Sample all 4 edges of the target image
    u_edge = np.linspace(0, W - 1, n_samples)
    v_edge = np.linspace(0, H - 1, n_samples)

    u_boundary = np.concatenate([u_edge, u_edge, np.zeros(n_samples), np.full(n_samples, W - 1)])
    v_boundary = np.concatenate([np.zeros(n_samples), np.full(n_samples, H - 1), v_edge, v_edge])

    # Delegate inverse distortion to OpenCV's C++ solver, same as compute_distortion_maps.
    K_tgt = np.array(k_target, dtype=np.float64)
    D = np.array(distortion_coeffs, dtype=np.float64)
    pts = np.stack([u_boundary, v_boundary], axis=-1).reshape(-1, 1, 2).astype(np.float64)
    if distortion_model == "kannala_brandt":
        undist = cv2.fisheye.undistortPoints(pts, K_tgt, D)
    else:
        undist = cv2.undistortPoints(pts, K_tgt, D)
    x_undist = undist[:, 0, 0]
    y_undist = undist[:, 0, 1]

    # Maximum angular extent in each axis
    max_x = float(np.max(np.abs(x_undist)))
    max_y = float(np.max(np.abs(y_undist)))

    # Build overscan resolution (rounded up to nearest even for codec compatibility)
    W_lin = 2 * math.ceil(max_x * fx + 1)
    H_lin = 2 * math.ceil(max_y * fy + 1)

    # Ensure overscan is at least as large as target
    W_lin = max(W_lin, W)
    H_lin = max(H_lin, H)

    cx_lin = W_lin / 2.0
    cy_lin = H_lin / 2.0

    k_linear: list[list[float]] = [
        [fx, 0.0, cx_lin],
        [0.0, fy, cy_lin],
        [0.0, 0.0, 1.0],
    ]

    return k_linear, (W_lin, H_lin)

compute_spherical_overscan_params

compute_spherical_overscan_params(
    k_target: list[list[float]],
    resolution: tuple[int, int],
    distortion_coeffs: list[float],
    margin_deg: float = 2.0,
    n_samples: int = 32,
) -> tuple[float, tuple[int, int]]

Compute the FOV and square resolution for a Blender FISHEYE_EQUIDISTANT intermediate render.

In the equidistant model, pixel radius is proportional to incidence angle θ, which stays bounded for any physically realisable lens (unlike tan(θ) in the pinhole model). This makes it the correct intermediate representation for Kannala-Brandt fisheye lenses.

Samples the 4 edges of the target image, unprojects through the inverse Kannala-Brandt model to ideal normalised rays, converts to incidence angles θ = atan(‖ray‖), and adds a safety margin before computing render parameters.

Parameters:

Name Type Description Default
k_target list[list[float]]

3x3 target K-matrix [[fx,0,cx],[0,fy,cy],[0,0,1]].

required
resolution tuple[int, int]

(width, height) of the target distorted image.

required
distortion_coeffs list[float]

Kannala-Brandt coefficients [k1, k2, k3, k4].

required
margin_deg float

Angular margin in degrees added beyond θ_max (default 2°).

2.0
n_samples int

Number of sample points per edge.

32

Returns:

Type Description
(fov_spherical, (R, R))

full FOV in radians and square render resolution.

Source code in src/render_tag/generation/compiler.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def compute_spherical_overscan_params(
    k_target: list[list[float]],
    resolution: tuple[int, int],
    distortion_coeffs: list[float],
    margin_deg: float = 2.0,
    n_samples: int = 32,
) -> tuple[float, tuple[int, int]]:
    """
    Compute the FOV and square resolution for a Blender FISHEYE_EQUIDISTANT intermediate render.

    In the equidistant model, pixel radius is proportional to incidence angle θ, which
    stays bounded for any physically realisable lens (unlike tan(θ) in the pinhole model).
    This makes it the correct intermediate representation for Kannala-Brandt fisheye lenses.

    Samples the 4 edges of the target image, unprojects through the inverse Kannala-Brandt
    model to ideal normalised rays, converts to incidence angles θ = atan(‖ray‖), and adds
    a safety margin before computing render parameters.

    Args:
        k_target: 3x3 target K-matrix [[fx,0,cx],[0,fy,cy],[0,0,1]].
        resolution: (width, height) of the target distorted image.
        distortion_coeffs: Kannala-Brandt coefficients [k1, k2, k3, k4].
        margin_deg: Angular margin in degrees added beyond θ_max (default 2°).
        n_samples: Number of sample points per edge.

    Returns:
        (fov_spherical, (R, R)): full FOV in radians and square render resolution.
    """
    W, H = resolution
    fx = k_target[0][0]

    u_edge = np.linspace(0, W - 1, n_samples)
    v_edge = np.linspace(0, H - 1, n_samples)
    u_boundary = np.concatenate([u_edge, u_edge, np.zeros(n_samples), np.full(n_samples, W - 1)])
    v_boundary = np.concatenate([np.zeros(n_samples), np.full(n_samples, H - 1), v_edge, v_edge])

    K_tgt = np.array(k_target, dtype=np.float64)
    D = np.array(distortion_coeffs, dtype=np.float64)
    pts = np.stack([u_boundary, v_boundary], axis=-1).reshape(-1, 1, 2).astype(np.float64)

    undist = cv2.fisheye.undistortPoints(pts, K_tgt, D)
    rho = np.sqrt(undist[:, 0, 0] ** 2 + undist[:, 0, 1] ** 2)
    theta = np.arctan(rho)  # incidence angle; safe for rho → ∞ (→ π/2)

    theta_max_render = float(np.max(theta)) + math.radians(margin_deg)
    fov_spherical = 2.0 * theta_max_render

    # Resolution: ensure angular density ≥ fx pixels/radian (same as target image)
    R = 2 * math.ceil(fx * theta_max_render)
    return fov_spherical, (R, R)

derive_iso_coupled_noise

derive_iso_coupled_noise(
    camera_config: CameraConfig,
) -> tuple[float, SensorNoiseConfig | None]

Derive effective (iso_noise, sensor_noise) from camera.iso.

Returns the user-configured values unchanged when iso_coupling is False. When coupling is enabled, fills only fields the user left at their schema defaults so explicit overrides always win.

Source code in src/render_tag/generation/compiler.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def derive_iso_coupled_noise(
    camera_config: CameraConfig,
) -> tuple[float, SensorNoiseConfig | None]:
    """Derive effective (iso_noise, sensor_noise) from ``camera.iso``.

    Returns the user-configured values unchanged when ``iso_coupling`` is False.
    When coupling is enabled, fills only fields the user left at their schema
    defaults so explicit overrides always win.
    """
    if not camera_config.iso_coupling:
        return camera_config.iso_noise, camera_config.sensor_noise

    iso = camera_config.iso

    if camera_config.iso_noise == 0.0:
        span = ISO_COUPLING_GAIN_CEILING - ISO_COUPLING_GAIN_FLOOR
        effective_iso_noise = float(np.clip((iso - ISO_COUPLING_GAIN_FLOOR) / span, 0.0, 1.0))
    else:
        effective_iso_noise = camera_config.iso_noise

    if camera_config.sensor_noise is None:
        stddev = ISO_COUPLING_BASE_SIGMA * (iso / 100.0) ** ISO_COUPLING_ALPHA
        effective_sensor_noise: SensorNoiseConfig | None = SensorNoiseConfig(
            model="gaussian",
            stddev=stddev,
        )
    else:
        effective_sensor_noise = camera_config.sensor_noise

    return effective_iso_noise, effective_sensor_noise

Worker pool management, ZMQ communication, and parallel rendering.

render_tag.orchestration.orchestrator

Unified orchestration engine for render-tag.

Handles worker pool management, sharding, and parallel execution.

Classes

OrchestratorConfig dataclass

Immutable configuration for the UnifiedWorkerOrchestrator.

Attributes:

Name Type Description
num_workers int

Number of parallel Blender processes to maintain.

base_port int

Starting port for ZMQ communication.

blender_script Path | None

Path to the worker bootstrap script.

blender_executable str | None

Path to the Blender or BlenderProc binary.

use_blenderproc bool

Whether to use the BlenderProc wrapper.

mock bool

If True, uses mocks instead of a real Blender process.

vram_threshold_mb float | None

VRAM limit for preventative worker restarts.

ephemeral bool

If True, workers are optimized for short-lived jobs.

max_renders_per_worker int | None

Restart worker after this many renders.

worker_id_prefix str

Prefix for naming worker processes.

seed int

Global random seed for deterministic generation.

memory_limit_mb int | None

Soft RAM limit per worker process.

Source code in src/render_tag/orchestration/orchestrator.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
@dataclass(frozen=True)
class OrchestratorConfig:
    """Immutable configuration for the UnifiedWorkerOrchestrator.

    Attributes:
        num_workers: Number of parallel Blender processes to maintain.
        base_port: Starting port for ZMQ communication.
        blender_script: Path to the worker bootstrap script.
        blender_executable: Path to the Blender or BlenderProc binary.
        use_blenderproc: Whether to use the BlenderProc wrapper.
        mock: If True, uses mocks instead of a real Blender process.
        vram_threshold_mb: VRAM limit for preventative worker restarts.
        ephemeral: If True, workers are optimized for short-lived jobs.
        max_renders_per_worker: Restart worker after this many renders.
        worker_id_prefix: Prefix for naming worker processes.
        seed: Global random seed for deterministic generation.
        memory_limit_mb: Soft RAM limit per worker process.
    """

    num_workers: int = 1
    base_port: int = 20000
    blender_script: Path | None = None
    blender_executable: str | None = None
    use_blenderproc: bool = True
    mock: bool = False
    vram_threshold_mb: float | None = None
    ephemeral: bool = False
    max_renders_per_worker: int | None = None
    worker_id_prefix: str = "worker"
    seed: int = 42
    memory_limit_mb: int | None = None

    def __post_init__(self):
        """Resolve defaults for blender_script and blender_executable."""
        if self.blender_script is None:
            default = Path(__file__).resolve().parents[3] / "scripts" / "worker_bootstrap.py"
            object.__setattr__(self, "blender_script", default)
        mock = self.mock or (os.environ.get("RENDER_TAG_FORCE_MOCK") == "1")
        if self.blender_executable is None:
            exe = sys.executable if mock else "blenderproc"
            object.__setattr__(self, "blender_executable", exe)
Functions
__post_init__
__post_init__()

Resolve defaults for blender_script and blender_executable.

Source code in src/render_tag/orchestration/orchestrator.py
89
90
91
92
93
94
95
96
97
def __post_init__(self):
    """Resolve defaults for blender_script and blender_executable."""
    if self.blender_script is None:
        default = Path(__file__).resolve().parents[3] / "scripts" / "worker_bootstrap.py"
        object.__setattr__(self, "blender_script", default)
    mock = self.mock or (os.environ.get("RENDER_TAG_FORCE_MOCK") == "1")
    if self.blender_executable is None:
        exe = sys.executable if mock else "blenderproc"
        object.__setattr__(self, "blender_executable", exe)

UnifiedWorkerOrchestrator

Manages a pool of persistent Blender workers.

Source code in src/render_tag/orchestration/orchestrator.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
class UnifiedWorkerOrchestrator:
    """Manages a pool of persistent Blender workers."""

    _instances: ClassVar[list["UnifiedWorkerOrchestrator"]] = []

    def __init__(
        self,
        config: OrchestratorConfig | None = None,
        log_path: Path | None = None,
        **kwargs,
    ):
        if config is None:
            # Fallback for backwards compatibility or quick instantiation
            config = OrchestratorConfig(**kwargs)
        self.config = config
        self.log_path = log_path

        self.mock = self.config.mock or (os.environ.get("RENDER_TAG_FORCE_MOCK") == "1")
        self.blender_script = self.config.blender_script
        self.blender_executable = self.config.blender_executable

        self.job_id = str(uuid.uuid4())
        self.context = zmq.Context() if zmq else None
        self.thread_budget = get_thread_budget(num_workers=self.config.num_workers)
        self.workers, self.worker_queue = [], queue.Queue()
        self.monitor: HealthMonitor | None = None
        self.auditor = TelemetryAuditor()
        self._lock, self.running = threading.Lock(), False
        self._resource_stack = ResourceStack()
        UnifiedWorkerOrchestrator._instances.append(self)

    @property
    def num_workers(self) -> int:
        return self.config.num_workers

    @property
    def base_port(self) -> int:
        return self.config.base_port

    @property
    def use_blenderproc(self) -> bool:
        return self.config.use_blenderproc

    @property
    def vram_threshold_mb(self) -> float | None:
        return self.config.vram_threshold_mb

    @property
    def ephemeral(self) -> bool:
        return self.config.ephemeral

    @property
    def max_renders_per_worker(self) -> int | None:
        return self.config.max_renders_per_worker

    @property
    def worker_id_prefix(self) -> str:
        return self.config.worker_id_prefix

    @property
    def seed(self) -> int:
        return self.config.seed

    @property
    def memory_limit_mb(self) -> int | None:
        return self.config.memory_limit_mb

    @classmethod
    def cleanup_all(cls):
        """Stop all active orchestrator instances and their workers."""
        for i in list(cls._instances):
            i.stop()
        cls._instances.clear()

    def start(self, shard_id: str = "main"):
        """Initialize the worker pool and start persistent processes.

        Calculates memory budgets, verifies port availability, and launches workers
        in parallel.

        Args:
            shard_id: Optional identifier for the current work shard.

        Raises:
            WorkerStartupError: If workers fail to initialize or contact the bridge.
        """
        with self._lock:
            if self.running:
                return

            seed_str = f"{shard_id}-{os.getpid()}-{os.urandom(8).hex()}"
            port_offset = (
                int(hashlib.md5(seed_str.encode(), usedforsecurity=False).hexdigest(), 16) % 10000
            )
            jitter = int.from_bytes(os.urandom(2), "big") % 50 * 10
            current_base_port = self.base_port + port_offset + jitter
            # Calculate memory budget per worker
            effective_memory_limit = calculate_worker_memory_budget(
                num_workers=self.num_workers, explicit_limit_mb=self.memory_limit_mb
            )

            # Port scanning: Ensure the entire range is free
            for _ in range(10):
                if any(
                    is_port_in_use(current_base_port + i)
                    or is_port_in_use(current_base_port + i + 100)
                    for i in range(self.num_workers)
                ):
                    current_base_port += 200  # Shift by a safe margin
                    continue
                break

            with ResourceStack() as attempt_stack:
                try:
                    # Start Health Monitor
                    telemetry_ports = [
                        current_base_port + i + 1000 for i in range(self.num_workers)
                    ]
                    self.monitor = HealthMonitor(ports=telemetry_ports, log_path=self.log_path)
                    self.monitor.start()

                    for i in range(self.num_workers):
                        unique_shard_id = f"{i}_{uuid.uuid4().hex[:6]}"
                        worker = PersistentWorkerProcess(
                            f"{self.worker_id_prefix}-{i}",
                            current_base_port + i,
                            self.blender_script,
                            self.blender_executable,
                            use_blenderproc=self.use_blenderproc,
                            mock=self.mock,
                            max_renders=self.max_renders_per_worker,
                            shard_id=unique_shard_id,
                            context=self.context,
                            thread_budget=self.thread_budget,
                            seed=self.config.seed,
                            job_id=self.job_id,
                            memory_limit_mb=effective_memory_limit,
                        )
                        worker.start()

                        attempt_stack.push_resource(worker)

                        self.workers.append(worker)
                        self.worker_queue.put(worker)
                    self._resource_stack.enter_context(attempt_stack.pop_all())
                    self.running = True
                except Exception as e:
                    raise WorkerStartupError(f"Startup failed: {e}") from e

    def stop(self):
        """Shutdown all workers and release resources."""
        with self._lock:
            if not self.running:
                return

            logger.info("Stopping Orchestrator and shutting down workers...")
            if self.monitor:
                self.monitor.stop()
            self._resource_stack.close()
            self.workers.clear()

            while not self.worker_queue.empty():
                try:
                    self.worker_queue.get_nowait()
                except queue.Empty:
                    break

            if self.context:
                logger.debug("Terminating ZMQ context...")
                try:
                    # FORCE CLOSE ALL SOCKETS with linger=0.
                    # This is the nuclear option to prevent hangs if any socket was leaked
                    # or if a worker is stuck.
                    self.context.destroy(linger=0)
                    logger.debug("ZMQ context destroyed.")
                except Exception as e:
                    logger.warning(f"Error destroying ZMQ context: {e}")
                self.context = None

            if self in UnifiedWorkerOrchestrator._instances:
                UnifiedWorkerOrchestrator._instances.remove(self)

            self.running = False
            logger.info("Orchestrator stopped.")

    def get_worker(self) -> PersistentWorkerProcess:
        """Acquire an available worker from the queue (blocking)."""
        return self.worker_queue.get()

    def release_worker(self, worker: PersistentWorkerProcess):
        """Return a worker to the pool, handling health checks and restarts.

        If a worker has exceeded its render limit or resource threshold, it is
        restarted before being returned to the queue.
        """
        intentional_exit = (
            worker.max_renders is not None and worker.renders_completed >= worker.max_renders
        )

        should_restart, limit_exceeded = self._check_worker_health(worker, intentional_exit)

        if should_restart or intentional_exit:
            worker = self._restart_worker(worker, limit_exceeded)

        self.worker_queue.put(worker)

    def _check_worker_health(
        self, worker: PersistentWorkerProcess, intentional_exit: bool
    ) -> tuple[bool, bool]:
        """Check if a worker needs to be restarted due to health or resource limits."""
        should_restart = False
        limit_exceeded = False

        if self.monitor:
            snapshot = self.monitor.get_snapshot(worker.worker_id)
            if snapshot:
                telemetry = snapshot.telemetry
                self.auditor.add_entry(worker.worker_id, telemetry)

                # Check for memory or VRAM limits
                if (
                    telemetry.status == WorkerStatus.RESOURCE_LIMIT_EXCEEDED
                    or snapshot.liveness == "UNRESPONSIVE"
                ):
                    limit_exceeded = telemetry.status == WorkerStatus.RESOURCE_LIMIT_EXCEEDED
                    should_restart = True
                    if snapshot.liveness == "UNRESPONSIVE":
                        logger.error(
                            f"Worker {worker.worker_id} is UNRESPONSIVE. Triggering restart."
                        )
                elif (
                    not intentional_exit
                    and self.vram_threshold_mb
                    and telemetry.vram_used_mb > self.vram_threshold_mb
                ):
                    should_restart = True
            elif not intentional_exit:
                # No snapshot yet, might be starting up or stalled
                # We'll trust PersistentWorkerProcess.is_healthy() as fallback
                pass

        if (
            not should_restart
            and not intentional_exit
            and (not worker.client or not worker.process or not worker.is_healthy())
        ):
            should_restart = True

        return should_restart, limit_exceeded

    def _restart_worker(
        self, worker: PersistentWorkerProcess, limit_exceeded: bool
    ) -> PersistentWorkerProcess:
        """Stop and restart a worker process."""
        if limit_exceeded:
            logger.info(f"Preventative restart for {worker.worker_id} (Resource limit exceeded)")

        worker.stop()
        slot_id = worker.shard_id.split("_")[0]
        unique_shard_id = f"{slot_id}_{uuid.uuid4().hex[:6]}"
        new_worker = PersistentWorkerProcess(
            worker.worker_id,
            worker.port,
            self.blender_script,
            self.blender_executable,
            use_blenderproc=self.use_blenderproc,
            mock=self.mock,
            max_renders=self.max_renders_per_worker,
            shard_id=unique_shard_id,
            context=self.context,
            thread_budget=self.thread_budget,
            seed=self.config.seed,
            job_id=self.job_id,
            memory_limit_mb=worker.memory_limit_mb,
        )
        new_worker.start()

        # Register the replacement worker for cleanup. Workers created in start() are
        # tracked by _resource_stack, but restarted workers are not — leaving their
        # blender processes as orphans when stop() is called.
        self._resource_stack.push_resource(new_worker)

        # Replace in active workers list
        for idx, w in enumerate(self.workers):
            if w.worker_id == worker.worker_id:
                self.workers[idx] = new_worker
                break
        return new_worker

    def execute_recipe(
        self, recipe: dict, output_dir: Path, rm: str = "cycles", sid: str | None = None
    ) -> Response:
        """Execute a single render job on an available worker.

        Handles retries for transient failures and resource exhaustion.

        Args:
            recipe: The JSON-serializable scene description.
            output_dir: Path where the rendered artifacts will be saved.
            rm: Renderer mode ('cycles', 'eevee', 'workbench').
            sid: Optional shard ID for the render task.

        Returns:
            The worker response containing status and metadata.

        Raises:
            WorkerCommunicationError: If the render fails after all retries.
        """
        max_retries = 2
        attempt = 0
        last_error = None

        while attempt <= max_retries:
            worker = self.get_worker()
            try:
                effective_shard_id = sid if sid is not None else worker.shard_id
                resp = worker.send_command(
                    CommandType.RENDER,
                    {
                        "recipe": recipe,
                        "output_dir": str(output_dir),
                        "renderer_mode": rm,
                        "shard_id": effective_shard_id,
                        "skip_visibility": self.mock,
                    },
                )

                # Check for memory limit exceeded during render.
                # Worker is already dead — restart it via release_worker, then retry
                # without counting as a failed attempt.
                if (
                    resp.status == ResponseStatus.FAILURE
                    and resp.message
                    and "RESOURCE_LIMIT_EXCEEDED" in resp.message
                ):
                    logger.info(
                        f"Worker {worker.worker_id} exceeded resource limits during render. "
                        "Retrying."
                    )
                    self.release_worker(worker)
                    continue

                if resp.status == ResponseStatus.SUCCESS:
                    worker.renders_completed += 1
                self.release_worker(worker)
                return resp
            except Exception as e:
                last_error = e
                logger.warning(f"Render attempt {attempt + 1} failed for {worker.worker_id}: {e}")
                worker.stop()
                self.release_worker(worker)
                attempt += 1

        raise WorkerCommunicationError(
            f"Execute recipe failed after {max_retries} retries: {last_error}"
        )

    def __enter__(self):
        """Context manager entry: starts the orchestrator."""
        if not self.running:
            self.start()
        return self

    def __exit__(self, et, ev, tb):
        """Context manager exit: stops the orchestrator."""
        self.stop()
Functions
__enter__
__enter__()

Context manager entry: starts the orchestrator.

Source code in src/render_tag/orchestration/orchestrator.py
457
458
459
460
461
def __enter__(self):
    """Context manager entry: starts the orchestrator."""
    if not self.running:
        self.start()
    return self
__exit__
__exit__(et, ev, tb)

Context manager exit: stops the orchestrator.

Source code in src/render_tag/orchestration/orchestrator.py
463
464
465
def __exit__(self, et, ev, tb):
    """Context manager exit: stops the orchestrator."""
    self.stop()
cleanup_all classmethod
cleanup_all()

Stop all active orchestrator instances and their workers.

Source code in src/render_tag/orchestration/orchestrator.py
167
168
169
170
171
172
@classmethod
def cleanup_all(cls):
    """Stop all active orchestrator instances and their workers."""
    for i in list(cls._instances):
        i.stop()
    cls._instances.clear()
execute_recipe
execute_recipe(
    recipe: dict,
    output_dir: Path,
    rm: str = "cycles",
    sid: str | None = None,
) -> Response

Execute a single render job on an available worker.

Handles retries for transient failures and resource exhaustion.

Parameters:

Name Type Description Default
recipe dict

The JSON-serializable scene description.

required
output_dir Path

Path where the rendered artifacts will be saved.

required
rm str

Renderer mode ('cycles', 'eevee', 'workbench').

'cycles'
sid str | None

Optional shard ID for the render task.

None

Returns:

Type Description
Response

The worker response containing status and metadata.

Raises:

Type Description
WorkerCommunicationError

If the render fails after all retries.

Source code in src/render_tag/orchestration/orchestrator.py
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
def execute_recipe(
    self, recipe: dict, output_dir: Path, rm: str = "cycles", sid: str | None = None
) -> Response:
    """Execute a single render job on an available worker.

    Handles retries for transient failures and resource exhaustion.

    Args:
        recipe: The JSON-serializable scene description.
        output_dir: Path where the rendered artifacts will be saved.
        rm: Renderer mode ('cycles', 'eevee', 'workbench').
        sid: Optional shard ID for the render task.

    Returns:
        The worker response containing status and metadata.

    Raises:
        WorkerCommunicationError: If the render fails after all retries.
    """
    max_retries = 2
    attempt = 0
    last_error = None

    while attempt <= max_retries:
        worker = self.get_worker()
        try:
            effective_shard_id = sid if sid is not None else worker.shard_id
            resp = worker.send_command(
                CommandType.RENDER,
                {
                    "recipe": recipe,
                    "output_dir": str(output_dir),
                    "renderer_mode": rm,
                    "shard_id": effective_shard_id,
                    "skip_visibility": self.mock,
                },
            )

            # Check for memory limit exceeded during render.
            # Worker is already dead — restart it via release_worker, then retry
            # without counting as a failed attempt.
            if (
                resp.status == ResponseStatus.FAILURE
                and resp.message
                and "RESOURCE_LIMIT_EXCEEDED" in resp.message
            ):
                logger.info(
                    f"Worker {worker.worker_id} exceeded resource limits during render. "
                    "Retrying."
                )
                self.release_worker(worker)
                continue

            if resp.status == ResponseStatus.SUCCESS:
                worker.renders_completed += 1
            self.release_worker(worker)
            return resp
        except Exception as e:
            last_error = e
            logger.warning(f"Render attempt {attempt + 1} failed for {worker.worker_id}: {e}")
            worker.stop()
            self.release_worker(worker)
            attempt += 1

    raise WorkerCommunicationError(
        f"Execute recipe failed after {max_retries} retries: {last_error}"
    )
get_worker
get_worker() -> PersistentWorkerProcess

Acquire an available worker from the queue (blocking).

Source code in src/render_tag/orchestration/orchestrator.py
285
286
287
def get_worker(self) -> PersistentWorkerProcess:
    """Acquire an available worker from the queue (blocking)."""
    return self.worker_queue.get()
release_worker
release_worker(worker: PersistentWorkerProcess)

Return a worker to the pool, handling health checks and restarts.

If a worker has exceeded its render limit or resource threshold, it is restarted before being returned to the queue.

Source code in src/render_tag/orchestration/orchestrator.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
def release_worker(self, worker: PersistentWorkerProcess):
    """Return a worker to the pool, handling health checks and restarts.

    If a worker has exceeded its render limit or resource threshold, it is
    restarted before being returned to the queue.
    """
    intentional_exit = (
        worker.max_renders is not None and worker.renders_completed >= worker.max_renders
    )

    should_restart, limit_exceeded = self._check_worker_health(worker, intentional_exit)

    if should_restart or intentional_exit:
        worker = self._restart_worker(worker, limit_exceeded)

    self.worker_queue.put(worker)
start
start(shard_id: str = 'main')

Initialize the worker pool and start persistent processes.

Calculates memory budgets, verifies port availability, and launches workers in parallel.

Parameters:

Name Type Description Default
shard_id str

Optional identifier for the current work shard.

'main'

Raises:

Type Description
WorkerStartupError

If workers fail to initialize or contact the bridge.

Source code in src/render_tag/orchestration/orchestrator.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def start(self, shard_id: str = "main"):
    """Initialize the worker pool and start persistent processes.

    Calculates memory budgets, verifies port availability, and launches workers
    in parallel.

    Args:
        shard_id: Optional identifier for the current work shard.

    Raises:
        WorkerStartupError: If workers fail to initialize or contact the bridge.
    """
    with self._lock:
        if self.running:
            return

        seed_str = f"{shard_id}-{os.getpid()}-{os.urandom(8).hex()}"
        port_offset = (
            int(hashlib.md5(seed_str.encode(), usedforsecurity=False).hexdigest(), 16) % 10000
        )
        jitter = int.from_bytes(os.urandom(2), "big") % 50 * 10
        current_base_port = self.base_port + port_offset + jitter
        # Calculate memory budget per worker
        effective_memory_limit = calculate_worker_memory_budget(
            num_workers=self.num_workers, explicit_limit_mb=self.memory_limit_mb
        )

        # Port scanning: Ensure the entire range is free
        for _ in range(10):
            if any(
                is_port_in_use(current_base_port + i)
                or is_port_in_use(current_base_port + i + 100)
                for i in range(self.num_workers)
            ):
                current_base_port += 200  # Shift by a safe margin
                continue
            break

        with ResourceStack() as attempt_stack:
            try:
                # Start Health Monitor
                telemetry_ports = [
                    current_base_port + i + 1000 for i in range(self.num_workers)
                ]
                self.monitor = HealthMonitor(ports=telemetry_ports, log_path=self.log_path)
                self.monitor.start()

                for i in range(self.num_workers):
                    unique_shard_id = f"{i}_{uuid.uuid4().hex[:6]}"
                    worker = PersistentWorkerProcess(
                        f"{self.worker_id_prefix}-{i}",
                        current_base_port + i,
                        self.blender_script,
                        self.blender_executable,
                        use_blenderproc=self.use_blenderproc,
                        mock=self.mock,
                        max_renders=self.max_renders_per_worker,
                        shard_id=unique_shard_id,
                        context=self.context,
                        thread_budget=self.thread_budget,
                        seed=self.config.seed,
                        job_id=self.job_id,
                        memory_limit_mb=effective_memory_limit,
                    )
                    worker.start()

                    attempt_stack.push_resource(worker)

                    self.workers.append(worker)
                    self.worker_queue.put(worker)
                self._resource_stack.enter_context(attempt_stack.pop_all())
                self.running = True
            except Exception as e:
                raise WorkerStartupError(f"Startup failed: {e}") from e
stop
stop()

Shutdown all workers and release resources.

Source code in src/render_tag/orchestration/orchestrator.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def stop(self):
    """Shutdown all workers and release resources."""
    with self._lock:
        if not self.running:
            return

        logger.info("Stopping Orchestrator and shutting down workers...")
        if self.monitor:
            self.monitor.stop()
        self._resource_stack.close()
        self.workers.clear()

        while not self.worker_queue.empty():
            try:
                self.worker_queue.get_nowait()
            except queue.Empty:
                break

        if self.context:
            logger.debug("Terminating ZMQ context...")
            try:
                # FORCE CLOSE ALL SOCKETS with linger=0.
                # This is the nuclear option to prevent hangs if any socket was leaked
                # or if a worker is stuck.
                self.context.destroy(linger=0)
                logger.debug("ZMQ context destroyed.")
            except Exception as e:
                logger.warning(f"Error destroying ZMQ context: {e}")
            self.context = None

        if self in UnifiedWorkerOrchestrator._instances:
            UnifiedWorkerOrchestrator._instances.remove(self)

        self.running = False
        logger.info("Orchestrator stopped.")

Functions

get_completed_scene_ids

get_completed_scene_ids(output_dir: Path) -> set[int]

Scan output directory for completed scene metadata files.

Source code in src/render_tag/orchestration/orchestrator.py
468
469
470
471
472
473
474
475
476
477
478
479
def get_completed_scene_ids(output_dir: Path) -> set[int]:
    """Scan output directory for completed scene metadata files."""
    completed_ids = set()
    images_dir = output_dir / "images"
    if not images_dir.exists():
        return completed_ids
    pattern = re.compile(r"scene_(\d+)(?:_cam_\d+)?_meta\.json")
    for f in images_dir.glob("*.json"):
        match = pattern.match(f.name)
        if match:
            completed_ids.add(int(match.group(1)))
    return completed_ids

orchestrate

orchestrate(
    job_spec: JobSpec,
    workers: int = 1,
    executor_type: str = "local",
    resume: bool = False,
    batch_size: int = 5,
    verbose: bool = False,
    progress_cb: Callable[[int, int], None] | None = None,
) -> OrchestrationResult

Main orchestration loop for executing a JobSpec.

Handles sharding, resumption, and parallel execution of render tasks.

Parameters:

Name Type Description Default
job_spec JobSpec

Detailed specification of the rendering job.

required
workers int

Number of parallel worker processes to spawn.

1
executor_type str

Infrastructure target ('local', 'cloud').

'local'
resume bool

If True, skips already completed scenes.

False
batch_size int

Number of recipes per worker batch.

5
verbose bool

If True, enables debug logging.

False
progress_cb Callable[[int, int], None] | None

Optional callback(increment, total) for progress reporting.

None

Returns:

Type Description
OrchestrationResult

OrchestrationResult containing detailed execution metrics.

Source code in src/render_tag/orchestration/orchestrator.py
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
def orchestrate(
    job_spec: JobSpec,
    workers: int = 1,
    executor_type: str = "local",
    resume: bool = False,
    batch_size: int = 5,
    verbose: bool = False,
    progress_cb: Callable[[int, int], None] | None = None,
) -> OrchestrationResult:
    """Main orchestration loop for executing a JobSpec.

    Handles sharding, resumption, and parallel execution of render tasks.

    Args:
        job_spec: Detailed specification of the rendering job.
        workers: Number of parallel worker processes to spawn.
        executor_type: Infrastructure target ('local', 'cloud').
        resume: If True, skips already completed scenes.
        batch_size: Number of recipes per worker batch.
        verbose: If True, enables debug logging.
        progress_cb: Optional callback(increment, total) for progress reporting.

    Returns:
        OrchestrationResult containing detailed execution metrics.
    """
    start_time = time.perf_counter()
    signal.signal(signal.SIGINT, _signal_handler)
    signal.signal(signal.SIGTERM, _signal_handler)

    output_dir = job_spec.paths.output_dir

    # Pre-calculate metadata for provenance
    job_spec_json = job_spec.model_dump_json() if hasattr(job_spec, "model_dump_json") else "{}"
    job_spec_hash = hashlib.sha256(job_spec_json.encode()).hexdigest()
    env_hash = hashlib.sha256(sys.version.encode()).hexdigest()  # Placeholder for real env state

    batches, total_to_process, _total_shards = _prepare_batches(
        job_spec, workers, batch_size, resume
    )

    if batches is None:
        # All complete
        return OrchestrationResult(
            success_count=0,
            skipped_count=job_spec.shard_size,
            timings=ExecutionTimings(total_duration_s=time.perf_counter() - start_time),
            metadata=JobMetadata(job_spec_hash=job_spec_hash, env_state_hash=env_hash),
        )

    if not batches:
        return OrchestrationResult(
            timings=ExecutionTimings(total_duration_s=time.perf_counter() - start_time),
            metadata=JobMetadata(job_spec_hash=job_spec_hash, env_state_hash=env_hash),
        )

    rm = job_spec.scene_config.renderer.mode if job_spec.scene_config.renderer else "cycles"
    force_mock = (os.environ.get("RENDER_TAG_FORCE_MOCK") == "1") or (
        "PYTEST_CURRENT_TEST" in os.environ
    )
    use_bproc = (shutil.which("blenderproc") is not None) and not force_mock

    config = OrchestratorConfig(
        num_workers=workers,
        ephemeral=True,
        max_renders_per_worker=batch_size,
        mock=not use_bproc,
        seed=job_spec.global_seed,
        memory_limit_mb=job_spec.infrastructure.max_memory_mb,
    )

    errors = []
    log_path = output_dir / "telemetry.ndjson"
    with UnifiedWorkerOrchestrator(config=config, log_path=log_path) as orchestrator:
        # Report total scenes to progress_cb if provided
        # Approximate total for progress bar initialization
        if progress_cb:
            progress_cb(0, total_to_process)

        errors = _run_orchestration_loop(
            orchestrator, batches, workers, output_dir, rm, progress_cb=progress_cb
        )

    for path in batches:
        if path.exists():
            path.unlink()

    total_duration = time.perf_counter() - start_time

    # Collect final metrics
    max_ram = 0.0
    max_vram = 0.0
    if orchestrator.auditor.records:
        max_ram = max(
            (r.get("ram_used_mb", 0.0) for r in orchestrator.auditor.records), default=0.0
        )
        max_vram = max(
            (r.get("vram_used_mb", 0.0) for r in orchestrator.auditor.records), default=0.0
        )

    return OrchestrationResult(
        success_count=total_to_process - len(errors),
        failed_count=len(errors),
        errors=errors,
        worker_metrics=WorkerMetrics(worker_id="pool", max_ram_mb=max_ram, max_vram_mb=max_vram),
        timings=ExecutionTimings(total_duration_s=total_duration),
        metadata=JobMetadata(job_spec_hash=job_spec_hash, env_state_hash=env_hash),
    )

resolve_shard_index

resolve_shard_index() -> int

Resolve the current shard index from cloud environment variables.

Source code in src/render_tag/orchestration/orchestrator.py
482
483
484
485
486
487
def resolve_shard_index() -> int:
    """Resolve the current shard index from cloud environment variables."""
    for ev in ["AWS_BATCH_JOB_ARRAY_INDEX", "CLOUD_RUN_TASK_INDEX", "JOB_COMPLETION_INDEX"]:
        if ev in os.environ:
            return int(os.environ[ev])
    return -1

render_tag.orchestration.worker

Worker process management for render-tag.

Classes

PersistentWorkerProcess

Lifecycle manager for a Blender subprocess with ZMQ IPC.

Source code in src/render_tag/orchestration/worker.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
class PersistentWorkerProcess:
    """Lifecycle manager for a Blender subprocess with ZMQ IPC."""

    def __init__(
        self,
        worker_id: str,
        port: int,
        blender_script: Path,
        blender_executable: str = "blenderproc",
        startup_timeout: int = 60,
        use_blenderproc: bool = True,
        mock: bool = False,
        max_renders: int | None = None,
        shard_id: str = "main",
        context: Any = None,
        thread_budget: int = 1,
        seed: int = 42,
        job_id: str | None = None,
        memory_limit_mb: int | None = None,
    ):
        self.worker_id, self.port = worker_id, port
        self.mgmt_port = port + 100
        self.job_id = job_id
        self.memory_limit_mb = memory_limit_mb
        self.blender_script, self.blender_executable = blender_script, blender_executable
        self.startup_timeout, self.use_blenderproc, self.mock = (
            startup_timeout,
            use_blenderproc,
            mock,
        )
        self.max_renders, self.shard_id, self.context = max_renders, shard_id, context
        self.thread_budget = thread_budget
        self.seed = seed
        self.process, self.client = None, None
        self.renders_completed = 0
        self._stop_event = threading.Event()
        self._log_thread: threading.Thread | None = None
        self._startup_logs = []
        self.logger = get_logger(f"worker.{worker_id}").bind(worker_id=worker_id, job_id=job_id)

    def _log_router(self):
        if not self.process or not self.process.stdout:
            return
        while not self._stop_event.is_set():
            try:
                line_bytes = self.process.stdout.readline()
            except (ValueError, OSError):
                break  # stdout was closed
            if not isinstance(line_bytes, bytes) or line_bytes == b"":
                break  # EOF or non-bytes (mock)
            line = line_bytes.decode("utf-8").rstrip()
            if not line:
                continue
            try:
                data = orjson.loads(line)
                self.logger.info(f"[{self.worker_id}] {data.get('message', '')}")
            except (orjson.JSONDecodeError, ValueError):
                if len(self._startup_logs) < 50:
                    self._startup_logs.append(line)
                self.logger.debug(f"[{self.worker_id}] {line}")

    def start(self):
        exec_to_use = sys.executable if self.mock else self.blender_executable

        if self.mock:
            cmd = [exec_to_use, str(self.blender_script), "--port", str(self.port)]
        else:
            base = (
                [exec_to_use, "run", str(self.blender_script)]
                if self.use_blenderproc
                else [exec_to_use, str(self.blender_script)]
            )
            cmd = [*base, "--port", str(self.port), "--mgmt-port", str(self.mgmt_port)]

        if self.mock:
            cmd.append("--mock")
        if self.max_renders:
            cmd.extend(["--max-renders", str(self.max_renders)])
        if self.shard_id:
            cmd.extend(["--shard-id", str(self.shard_id)])
        if self.memory_limit_mb:
            cmd.extend(["--memory-limit-mb", str(self.memory_limit_mb)])
        cmd.extend(["--seed", str(self.seed)])

        from render_tag.core.utils import get_subprocess_env

        env = get_subprocess_env(
            base_env=os.environ,
            thread_budget=self.thread_budget,
            job_id=self.job_id,
            mock=self.mock,
        )

        self.logger.info(f"Launching worker with command: {cmd}")
        self.process = subprocess.Popen(
            cmd,
            env=env,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            start_new_session=True,
            preexec_fn=set_worker_priority,
        )
        self._log_thread = threading.Thread(target=self._log_router, daemon=True)
        self._log_thread.start()
        self.client = ZmqHostClient(port=self.port, mgmt_port=self.mgmt_port, context=self.context)
        self.client.connect()

        start = time.time()
        while time.time() - start < self.startup_timeout:
            if self.process.poll() is not None:
                out = "\n".join(self._startup_logs)
                if not out and self.process.stdout:
                    with contextlib.suppress(Exception):
                        out = self.process.stdout.read(1000).decode()
                raise WorkerStartupError(f"Worker crashed during startup. Output:\n{out}")
            try:
                if self.is_healthy():
                    init_resp = self.send_command(CommandType.INIT, {}, timeout_ms=10000)
                    if init_resp.status != ResponseStatus.SUCCESS:
                        raise WorkerStartupError(
                            f"Worker initialization failed: {init_resp.message}"
                        )
                    return
            except Exception:
                pass
            time.sleep(0.5)
        self.stop()
        raise WorkerStartupError("Worker timeout")

    def _collect_descendant_pids(self, pid: int) -> list[int]:
        """Collect all descendant PIDs via /proc before the process tree disappears."""
        descendants = []
        try:
            import psutil

            parent = psutil.Process(pid)
            for child in parent.children(recursive=True):
                descendants.append(child.pid)
        except Exception:
            pass
        return descendants

    def stop(self):
        """Shut down the worker process and its communication client."""
        self._stop_event.set()

        # Collect all descendant PIDs while the process tree is still alive.
        # blenderproc spawns blender which may be in a different process group,
        # so killpg alone won't reach it.
        descendant_pids: list[int] = []
        if self.process and self.process.poll() is None:
            descendant_pids = self._collect_descendant_pids(self.process.pid)

        if self.client:
            if self.process and self.process.poll() is None:
                self.logger.info(f"Sending SHUTDOWN command to worker {self.worker_id}...")
                with contextlib.suppress(Exception):
                    self.client.send_command(CommandType.SHUTDOWN, timeout_ms=500)

                # Give the worker a chance to finalize writers gracefully
                try:
                    self.process.wait(timeout=2.0)
                except subprocess.TimeoutExpired:
                    self.logger.warning(
                        f"Worker {self.worker_id} did not exit gracefully, forcing kill."
                    )
            self.client.disconnect()
            self.client = None

        if self.process:
            pid = self.process.pid
            self.logger.info(f"Terminating worker {self.worker_id} (PID: {pid})...")
            try:
                # Safety check to avoid killpg(0) or killpg(1).
                # Handle MagicMock in tests by forcing to int if possible.
                actual_pid = int(pid) if not isinstance(pid, (int, float)) else pid
                if actual_pid > 1:
                    # Kill the process group (covers processes that share the PGID).
                    with contextlib.suppress(ProcessLookupError, PermissionError):
                        os.killpg(actual_pid, signal.SIGKILL)

                    # Kill any descendants that escaped the process group
                    # (e.g., blender spawned by blenderproc with its own PGID).
                    for cpid in descendant_pids:
                        with contextlib.suppress(ProcessLookupError, PermissionError):
                            os.kill(cpid, signal.SIGKILL)

                if self.process.poll() is None:
                    self.process.wait(timeout=2)
                self.logger.info(f"Worker {self.worker_id} terminated.")
            except (
                subprocess.TimeoutExpired,
                ProcessLookupError,
                PermissionError,
                ValueError,
                TypeError,
            ):
                self.logger.debug(f"Process cleanup issues for {self.worker_id}, ignoring.")

            # Close stdout so _log_router's readline() returns b"" and exits.
            # Must happen after kill — closing before kill risks losing final output.
            with contextlib.suppress(Exception):
                if self.process.stdout:
                    self.process.stdout.close()

            self.process = None

        # Join the log thread to prevent daemon thread accumulation across tests.
        if self._log_thread is not None:
            self._log_thread.join(timeout=2.0)
            self._log_thread = None

    @retry_with_backoff(retries=2, initial_delay=0.1, exceptions=(Exception,))
    def is_healthy(self) -> bool:
        if not self.process or self.process.poll() is not None or not self.client:
            return False
        try:
            resp = self.client.send_command(CommandType.STATUS, timeout_ms=2000)
            return resp.status == ResponseStatus.SUCCESS
        except Exception:
            return False

    def send_command(
        self, ct: CommandType, payload: dict | None = None, timeout_ms: int | None = None
    ) -> Response:
        return self.client.send_command(
            ct,
            payload,
            timeout_ms=timeout_ms,
            check_liveness=lambda: self.process.poll() is None,
        )
Functions
stop
stop()

Shut down the worker process and its communication client.

Source code in src/render_tag/orchestration/worker.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def stop(self):
    """Shut down the worker process and its communication client."""
    self._stop_event.set()

    # Collect all descendant PIDs while the process tree is still alive.
    # blenderproc spawns blender which may be in a different process group,
    # so killpg alone won't reach it.
    descendant_pids: list[int] = []
    if self.process and self.process.poll() is None:
        descendant_pids = self._collect_descendant_pids(self.process.pid)

    if self.client:
        if self.process and self.process.poll() is None:
            self.logger.info(f"Sending SHUTDOWN command to worker {self.worker_id}...")
            with contextlib.suppress(Exception):
                self.client.send_command(CommandType.SHUTDOWN, timeout_ms=500)

            # Give the worker a chance to finalize writers gracefully
            try:
                self.process.wait(timeout=2.0)
            except subprocess.TimeoutExpired:
                self.logger.warning(
                    f"Worker {self.worker_id} did not exit gracefully, forcing kill."
                )
        self.client.disconnect()
        self.client = None

    if self.process:
        pid = self.process.pid
        self.logger.info(f"Terminating worker {self.worker_id} (PID: {pid})...")
        try:
            # Safety check to avoid killpg(0) or killpg(1).
            # Handle MagicMock in tests by forcing to int if possible.
            actual_pid = int(pid) if not isinstance(pid, (int, float)) else pid
            if actual_pid > 1:
                # Kill the process group (covers processes that share the PGID).
                with contextlib.suppress(ProcessLookupError, PermissionError):
                    os.killpg(actual_pid, signal.SIGKILL)

                # Kill any descendants that escaped the process group
                # (e.g., blender spawned by blenderproc with its own PGID).
                for cpid in descendant_pids:
                    with contextlib.suppress(ProcessLookupError, PermissionError):
                        os.kill(cpid, signal.SIGKILL)

            if self.process.poll() is None:
                self.process.wait(timeout=2)
            self.logger.info(f"Worker {self.worker_id} terminated.")
        except (
            subprocess.TimeoutExpired,
            ProcessLookupError,
            PermissionError,
            ValueError,
            TypeError,
        ):
            self.logger.debug(f"Process cleanup issues for {self.worker_id}, ignoring.")

        # Close stdout so _log_router's readline() returns b"" and exits.
        # Must happen after kill — closing before kill risks losing final output.
        with contextlib.suppress(Exception):
            if self.process.stdout:
                self.process.stdout.close()

        self.process = None

    # Join the log thread to prevent daemon thread accumulation across tests.
    if self._log_thread is not None:
        self._log_thread.join(timeout=2.0)
        self._log_thread = None

Functions

set_worker_priority

set_worker_priority()

Linux-specific: Set process priority for workers and ensure death with parent.

Source code in src/render_tag/orchestration/worker.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def set_worker_priority():
    """Linux-specific: Set process priority for workers and ensure death with parent."""
    import ctypes
    import os
    import signal

    # 1. De-escalate priority (niceness)
    with contextlib.suppress(Exception):
        os.nice(10)

    # 2. PR_SET_PDEATHSIG = 1
    with contextlib.suppress(Exception):
        libc = ctypes.CDLL("libc.so.6")
        libc.prctl(1, signal.SIGKILL, 0, 0, 0)

Dataset readers and writers for standard formats (COCO, CSV, Rich Truth).

render_tag.data_io.readers

Structured readers for render-tag datasets.

Provides Pydantic-validated ingestion of rich_truth.json with convenience accessors for calibration workflows. The :class:CalibrationFrame class yields matched 2D/3D point pairs ready for cv2.solvePnP or cv2.calibrateCamera.

Attributes

Classes

CalibrationFrame

A single image's calibration data with matched 2D-3D point pairs.

Parameters:

Name Type Description Default
board_record DetectionRecord

The BOARD-type DetectionRecord for this image.

required
Source code in src/render_tag/data_io/readers.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
class CalibrationFrame:
    """A single image's calibration data with matched 2D-3D point pairs.

    Args:
        board_record: The BOARD-type DetectionRecord for this image.
    """

    def __init__(self, image_id: str, board_record: DetectionRecord) -> None:
        if board_record.board_definition is None:
            raise ValueError(f"BOARD record for {image_id} has no board_definition")
        self.record = board_record
        self._bd: BoardDefinition = board_record.board_definition

    @property
    def image_id(self) -> str:
        """Image identifier (derived from the underlying record)."""
        return self.record.image_id

    @property
    def board_definition(self) -> BoardDefinition:
        """The board geometry descriptor."""
        return self._bd

    @functools.cached_property
    def k_matrix(self) -> np.ndarray:
        """3x3 camera intrinsic matrix."""
        if self.record.k_matrix is None:
            raise ValueError(f"BOARD record for {self.image_id} has no k_matrix")
        return np.array(self.record.k_matrix)

    @property
    def resolution(self) -> tuple[int, int]:
        """(width, height) in pixels."""
        if self.record.resolution is None:
            raise ValueError(f"BOARD record for {self.image_id} has no resolution")
        return (self.record.resolution[0], self.record.resolution[1])

    def get_valid_calibration_pairs(
        self,
    ) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Extract matched 2D image points and 3D object points, filtering sentinels.

        Returns:
            Tuple of ``(obj_pts_3d, img_pts_2d, valid_ids)`` where:

            - ``obj_pts_3d``: ``(N, 3)`` float64 array in mm (board-local frame)
            - ``img_pts_2d``: ``(N, 2)`` float64 array in pixels
            - ``valid_ids``:  ``(N,)`` int32 array of keypoint indices
        """
        if self.record.keypoints is None:
            return (
                np.empty((0, 3), dtype=np.float64),
                np.empty((0, 2), dtype=np.float64),
                np.empty((0,), dtype=np.int32),
            )

        all_obj_pts = _compute_object_points_3d(self._bd)
        keypoints = self.record.keypoints

        valid_ids: list[int] = []
        img_pts: list[tuple[float, float]] = []
        obj_pts: list[tuple[float, float, float]] = []

        for i, (x, y) in enumerate(keypoints):
            if is_sentinel_keypoint(x, y):
                continue
            if i < len(all_obj_pts):
                valid_ids.append(i)
                img_pts.append((x, y))
                obj_pts.append(all_obj_pts[i])

        return (
            np.array(obj_pts, dtype=np.float64).reshape(-1, 3),
            np.array(img_pts, dtype=np.float64).reshape(-1, 2),
            np.array(valid_ids, dtype=np.int32),
        )

    def get_all_keypoints_with_visibility(
        self,
    ) -> tuple[np.ndarray, np.ndarray]:
        """Return all keypoints and a boolean visibility mask.

        Returns:
            Tuple of ``(keypoints_2d, visibility)`` where:

            - ``keypoints_2d``: ``(total_keypoints, 2)`` array (sentinels preserved)
            - ``visibility``:   ``(total_keypoints,)`` bool array
        """
        if self.record.keypoints is None:
            n = self._bd.total_keypoints
            return (
                np.full((n, 2), -1.0, dtype=np.float64),
                np.zeros(n, dtype=bool),
            )

        kp = np.array(self.record.keypoints, dtype=np.float64)
        vis = ~np.all(kp == KEYPOINT_SENTINEL, axis=1)
        return kp, vis
Attributes
board_definition property
board_definition: BoardDefinition

The board geometry descriptor.

image_id property
image_id: str

Image identifier (derived from the underlying record).

k_matrix cached property
k_matrix: ndarray

3x3 camera intrinsic matrix.

resolution property
resolution: tuple[int, int]

(width, height) in pixels.

Functions
get_all_keypoints_with_visibility
get_all_keypoints_with_visibility() -> tuple[
    np.ndarray, np.ndarray
]

Return all keypoints and a boolean visibility mask.

Returns:

Type Description
ndarray

Tuple of (keypoints_2d, visibility) where:

ndarray
  • keypoints_2d: (total_keypoints, 2) array (sentinels preserved)
tuple[ndarray, ndarray]
  • visibility: (total_keypoints,) bool array
Source code in src/render_tag/data_io/readers.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def get_all_keypoints_with_visibility(
    self,
) -> tuple[np.ndarray, np.ndarray]:
    """Return all keypoints and a boolean visibility mask.

    Returns:
        Tuple of ``(keypoints_2d, visibility)`` where:

        - ``keypoints_2d``: ``(total_keypoints, 2)`` array (sentinels preserved)
        - ``visibility``:   ``(total_keypoints,)`` bool array
    """
    if self.record.keypoints is None:
        n = self._bd.total_keypoints
        return (
            np.full((n, 2), -1.0, dtype=np.float64),
            np.zeros(n, dtype=bool),
        )

    kp = np.array(self.record.keypoints, dtype=np.float64)
    vis = ~np.all(kp == KEYPOINT_SENTINEL, axis=1)
    return kp, vis
get_valid_calibration_pairs
get_valid_calibration_pairs() -> tuple[
    np.ndarray, np.ndarray, np.ndarray
]

Extract matched 2D image points and 3D object points, filtering sentinels.

Returns:

Type Description
ndarray

Tuple of (obj_pts_3d, img_pts_2d, valid_ids) where:

ndarray
  • obj_pts_3d: (N, 3) float64 array in mm (board-local frame)
ndarray
  • img_pts_2d: (N, 2) float64 array in pixels
tuple[ndarray, ndarray, ndarray]
  • valid_ids: (N,) int32 array of keypoint indices
Source code in src/render_tag/data_io/readers.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def get_valid_calibration_pairs(
    self,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Extract matched 2D image points and 3D object points, filtering sentinels.

    Returns:
        Tuple of ``(obj_pts_3d, img_pts_2d, valid_ids)`` where:

        - ``obj_pts_3d``: ``(N, 3)`` float64 array in mm (board-local frame)
        - ``img_pts_2d``: ``(N, 2)`` float64 array in pixels
        - ``valid_ids``:  ``(N,)`` int32 array of keypoint indices
    """
    if self.record.keypoints is None:
        return (
            np.empty((0, 3), dtype=np.float64),
            np.empty((0, 2), dtype=np.float64),
            np.empty((0,), dtype=np.int32),
        )

    all_obj_pts = _compute_object_points_3d(self._bd)
    keypoints = self.record.keypoints

    valid_ids: list[int] = []
    img_pts: list[tuple[float, float]] = []
    obj_pts: list[tuple[float, float, float]] = []

    for i, (x, y) in enumerate(keypoints):
        if is_sentinel_keypoint(x, y):
            continue
        if i < len(all_obj_pts):
            valid_ids.append(i)
            img_pts.append((x, y))
            obj_pts.append(all_obj_pts[i])

    return (
        np.array(obj_pts, dtype=np.float64).reshape(-1, 3),
        np.array(img_pts, dtype=np.float64).reshape(-1, 2),
        np.array(valid_ids, dtype=np.int32),
    )

RenderTagDataset

Structured reader for rich_truth.json datasets.

Loads and validates all detection records through Pydantic, providing typed access to tags, boards, and calibration data.

Parameters:

Name Type Description Default
dataset_path Path | str

Directory containing rich_truth.json.

required
Source code in src/render_tag/data_io/readers.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
class RenderTagDataset:
    """Structured reader for ``rich_truth.json`` datasets.

    Loads and validates all detection records through Pydantic, providing
    typed access to tags, boards, and calibration data.

    Args:
        dataset_path: Directory containing ``rich_truth.json``.
    """

    def __init__(self, dataset_path: Path | str) -> None:
        self._path = Path(dataset_path)
        self._records = _load_records(self._path / "rich_truth.json")
        self._index = _build_index(self._records)

    @classmethod
    def from_json(cls, path: Path | str) -> RenderTagDataset:
        """Load from a specific ``rich_truth.json`` file path.

        Handles backward compatibility: if loaded JSON has ``board_definition``
        nested inside ``metadata`` (old format), it is migrated to the top-level
        field before validation.
        """
        path = Path(path)
        instance = object.__new__(cls)
        instance._path = path.parent
        instance._records = _load_records(path)
        instance._index = _build_index(instance._records)
        return instance

    @classmethod
    def from_records(cls, records: list[DetectionRecord]) -> RenderTagDataset:
        """Create from an existing list of DetectionRecords."""
        instance = object.__new__(cls)
        instance._path = Path(".")
        instance._records = list(records)
        instance._index = _build_index(instance._records)
        return instance

    @property
    def records(self) -> list[DetectionRecord]:
        """All detection records in the dataset."""
        return list(self._records)

    @property
    def image_ids(self) -> list[str]:
        """Unique image IDs in insertion order."""
        return list(self._index.keys())

    def get_records(self, image_id: str) -> list[DetectionRecord]:
        """All records for a given image."""
        return list(self._index.get(image_id, ()))

    def get_board_record(self, image_id: str) -> DetectionRecord | None:
        """The BOARD record for an image, or None."""
        for r in self._index.get(image_id, ()):
            if r.record_type == "BOARD":
                return r
        return None

    def get_tag_records(self, image_id: str) -> list[DetectionRecord]:
        """All TAG records for an image."""
        return [r for r in self._index.get(image_id, ()) if r.record_type == "TAG"]

    def iter_calibration_frames(self) -> Iterator[CalibrationFrame]:
        """Yield a :class:`CalibrationFrame` for each image that has a BOARD record."""
        for image_id in self.image_ids:
            board = self.get_board_record(image_id)
            if board is not None and board.board_definition is not None:
                yield CalibrationFrame(image_id, board)

    def get_calibration_frame(self, image_id: str) -> CalibrationFrame | None:
        """Return a CalibrationFrame for the given image, or None."""
        board = self.get_board_record(image_id)
        if board is not None and board.board_definition is not None:
            return CalibrationFrame(image_id, board)
        return None

    @property
    def board_definition(self) -> BoardDefinition | None:
        """The board definition shared by all frames (from the first BOARD record)."""
        for r in self._records:
            if r.record_type == "BOARD" and r.board_definition is not None:
                return r.board_definition
        return None
Attributes
board_definition property
board_definition: BoardDefinition | None

The board definition shared by all frames (from the first BOARD record).

image_ids property
image_ids: list[str]

Unique image IDs in insertion order.

records property
records: list[DetectionRecord]

All detection records in the dataset.

Functions
from_json classmethod
from_json(path: Path | str) -> RenderTagDataset

Load from a specific rich_truth.json file path.

Handles backward compatibility: if loaded JSON has board_definition nested inside metadata (old format), it is migrated to the top-level field before validation.

Source code in src/render_tag/data_io/readers.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
@classmethod
def from_json(cls, path: Path | str) -> RenderTagDataset:
    """Load from a specific ``rich_truth.json`` file path.

    Handles backward compatibility: if loaded JSON has ``board_definition``
    nested inside ``metadata`` (old format), it is migrated to the top-level
    field before validation.
    """
    path = Path(path)
    instance = object.__new__(cls)
    instance._path = path.parent
    instance._records = _load_records(path)
    instance._index = _build_index(instance._records)
    return instance
from_records classmethod
from_records(
    records: list[DetectionRecord],
) -> RenderTagDataset

Create from an existing list of DetectionRecords.

Source code in src/render_tag/data_io/readers.py
157
158
159
160
161
162
163
164
@classmethod
def from_records(cls, records: list[DetectionRecord]) -> RenderTagDataset:
    """Create from an existing list of DetectionRecords."""
    instance = object.__new__(cls)
    instance._path = Path(".")
    instance._records = list(records)
    instance._index = _build_index(instance._records)
    return instance
get_board_record
get_board_record(image_id: str) -> DetectionRecord | None

The BOARD record for an image, or None.

Source code in src/render_tag/data_io/readers.py
180
181
182
183
184
185
def get_board_record(self, image_id: str) -> DetectionRecord | None:
    """The BOARD record for an image, or None."""
    for r in self._index.get(image_id, ()):
        if r.record_type == "BOARD":
            return r
    return None
get_calibration_frame
get_calibration_frame(
    image_id: str,
) -> CalibrationFrame | None

Return a CalibrationFrame for the given image, or None.

Source code in src/render_tag/data_io/readers.py
198
199
200
201
202
203
def get_calibration_frame(self, image_id: str) -> CalibrationFrame | None:
    """Return a CalibrationFrame for the given image, or None."""
    board = self.get_board_record(image_id)
    if board is not None and board.board_definition is not None:
        return CalibrationFrame(image_id, board)
    return None
get_records
get_records(image_id: str) -> list[DetectionRecord]

All records for a given image.

Source code in src/render_tag/data_io/readers.py
176
177
178
def get_records(self, image_id: str) -> list[DetectionRecord]:
    """All records for a given image."""
    return list(self._index.get(image_id, ()))
get_tag_records
get_tag_records(image_id: str) -> list[DetectionRecord]

All TAG records for an image.

Source code in src/render_tag/data_io/readers.py
187
188
189
def get_tag_records(self, image_id: str) -> list[DetectionRecord]:
    """All TAG records for an image."""
    return [r for r in self._index.get(image_id, ()) if r.record_type == "TAG"]
iter_calibration_frames
iter_calibration_frames() -> Iterator[CalibrationFrame]

Yield a :class:CalibrationFrame for each image that has a BOARD record.

Source code in src/render_tag/data_io/readers.py
191
192
193
194
195
196
def iter_calibration_frames(self) -> Iterator[CalibrationFrame]:
    """Yield a :class:`CalibrationFrame` for each image that has a BOARD record."""
    for image_id in self.image_ids:
        board = self.get_board_record(image_id)
        if board is not None and board.board_definition is not None:
            yield CalibrationFrame(image_id, board)

Functions

unwrap_rich_truth

unwrap_rich_truth(raw: list | dict) -> list[dict]

Extract the records list from either format.

v1 (legacy): bare JSON array → returned as-is v2+: {"version": "2.0", "records": [...], ...} → records extracted

Source code in src/render_tag/data_io/readers.py
219
220
221
222
223
224
225
226
227
def unwrap_rich_truth(raw: list | dict) -> list[dict]:
    """Extract the records list from either format.

    v1 (legacy): bare JSON array  → returned as-is
    v2+:         ``{"version": "2.0", "records": [...], ...}`` → records extracted
    """
    if isinstance(raw, dict):
        return raw.get("records", [])
    return raw

render_tag.data_io.writers

Data export writers for render-tag.

This module handles writing detection annotations in various formats: - CSV format for corner coordinates - COCO format for instance segmentation

Attributes

Classes

AtomicWriter

Mixin for atomic file writing using temp file + rename.

Source code in src/render_tag/data_io/writers.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class AtomicWriter:
    """Mixin for atomic file writing using temp file + rename."""

    def _write_atomic(self, path: Path, data: Any) -> None:
        """Write data to a temp file then rename atomically."""
        temp_path = path.with_suffix(".tmp")
        try:
            with open(temp_path, "w") as f:
                if isinstance(data, (dict, list)):
                    json.dump(data, f, indent=2)
                else:
                    f.write(data)
                f.flush()
                os.fsync(f.fileno())

            temp_path.rename(path)
        except Exception:
            if temp_path.exists():
                temp_path.unlink()
            raise

BoardConfigWriter

Writer for calibration board configuration.

Source code in src/render_tag/data_io/writers.py
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
class BoardConfigWriter:
    """Writer for calibration board configuration."""

    def __init__(self, output_dir: Path) -> None:
        """Initialize the BoardConfig writer.

        Args:
            output_dir: Root directory for dataset output.
        """
        self.output_dir = output_dir

    def write_config(self, board_config: BoardConfig, filename: str = "board_config.json") -> Path:
        """Save the board configuration to a JSON file.

        Args:
            board_config: The BoardConfig instance to save.
            filename: Name of the output JSON file.

        Returns:
            Path to the written file.
        """
        self.output_dir.mkdir(parents=True, exist_ok=True)
        output_path = self.output_dir / filename

        with open(output_path, "w") as f:
            f.write(board_config.model_dump_json(indent=2))

        return output_path
Functions
__init__
__init__(output_dir: Path) -> None

Initialize the BoardConfig writer.

Parameters:

Name Type Description Default
output_dir Path

Root directory for dataset output.

required
Source code in src/render_tag/data_io/writers.py
476
477
478
479
480
481
482
def __init__(self, output_dir: Path) -> None:
    """Initialize the BoardConfig writer.

    Args:
        output_dir: Root directory for dataset output.
    """
    self.output_dir = output_dir
write_config
write_config(
    board_config: BoardConfig,
    filename: str = "board_config.json",
) -> Path

Save the board configuration to a JSON file.

Parameters:

Name Type Description Default
board_config BoardConfig

The BoardConfig instance to save.

required
filename str

Name of the output JSON file.

'board_config.json'

Returns:

Type Description
Path

Path to the written file.

Source code in src/render_tag/data_io/writers.py
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
def write_config(self, board_config: BoardConfig, filename: str = "board_config.json") -> Path:
    """Save the board configuration to a JSON file.

    Args:
        board_config: The BoardConfig instance to save.
        filename: Name of the output JSON file.

    Returns:
        Path to the written file.
    """
    self.output_dir.mkdir(parents=True, exist_ok=True)
    output_path = self.output_dir / filename

    with open(output_path, "w") as f:
        f.write(board_config.model_dump_json(indent=2))

    return output_path

COCOWriter

Bases: AtomicWriter

Writer for COCO format annotations.

Source code in src/render_tag/data_io/writers.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
class COCOWriter(AtomicWriter):
    """Writer for COCO format annotations."""

    def __init__(
        self,
        output_dir: Path,
        filename: str = "annotations.json",
        eval_margin_px: int = 0,
    ) -> None:
        """Initialize the COCO writer."""
        self.output_dir = output_dir
        self.filename = filename
        self._eval_margin_px = eval_margin_px
        self.images: list[dict] = []
        self.annotations: list[dict] = []
        self.categories: list[dict] = []

        self._category_map: dict[str, int] = {}
        self._next_image_id = 1
        self._next_annotation_id = 1
        self._dirty = False

    def _resolve_margin(self, detection: DetectionRecord | None) -> int:
        """Return the effective eval_margin_px: record beats writer default."""
        if detection is not None and detection.eval_margin_px > 0:
            return detection.eval_margin_px
        return self._eval_margin_px

    def add_category(self, name: str, supercategory: str = "fiducial_marker") -> int:
        """Add a category and return its ID."""
        if name in self._category_map:
            return self._category_map[name]

        cat_id = len(self.categories) + 1
        self.categories.append(
            {
                "id": cat_id,
                "name": name,
                "supercategory": supercategory,
                "keypoints": [
                    "tl",
                    "tr",
                    "br",
                    "bl",
                ],  # Industry standard (CW from TL)
                "skeleton": [[1, 2], [2, 3], [3, 4], [4, 1]],  # Edges
            }
        )
        self._category_map[name] = cat_id
        return cat_id

    def add_image(self, file_name: str, width: int, height: int) -> int:
        """Add an image entry and return its ID."""
        image_id = self._next_image_id
        self._next_image_id += 1

        self.images.append(
            {
                "id": image_id,
                "file_name": file_name,
                "width": width,
                "height": height,
            }
        )
        self._dirty = True
        return image_id

    def add_annotation(
        self,
        image_id: int,
        category_id: int,
        corners: list[tuple[float, float]],
        width: int | None = None,
        height: int | None = None,
        detection: DetectionRecord | None = None,
    ) -> int:
        """Add an annotation for a detected tag (optionally clipped)."""
        if corners is None and detection is not None:
            corners = detection.corners

        if corners is None or len(corners) == 0:
            raise ValueError("Annotation must have at least one point")

        annotation_id = self._next_annotation_id
        self._next_annotation_id += 1

        # Keep raw projected coordinates for keypoint visibility.
        # Clip only for the COCO segmentation polygon (COCO compliance).
        raw_corners = list(corners)
        if width is not None or height is not None:
            clipped_corners = [
                (
                    max(0.0, min(float(width or 1e9), c[0])) if c[0] > -999999.0 else c[0],
                    max(0.0, min(float(height or 1e9), c[1])) if c[1] > -999999.0 else c[1],
                )
                for c in corners
            ]
        else:
            clipped_corners = raw_corners

        # Handle point annotations (e.g. saddle points)
        if len(raw_corners) < 3:
            # For 1 or 2 points, bbox is small area around points
            x_coords = [c[0] for c in raw_corners]
            y_coords = [c[1] for c in raw_corners]
            min_x, max_x = min(x_coords), max(x_coords)
            min_y, max_y = min(y_coords), max(y_coords)

            # If it's a single point, give it a tiny 1px box for COCO compatibility
            if min_x == max_x:
                min_x -= 0.5
                max_x += 0.5
            if min_y == max_y:
                min_y -= 0.5
                max_y += 0.5

            bbox = [min_x, min_y, max_x - min_x, max_y - min_y]
            area = (max_x - min_x) * (max_y - min_y)
            segmentation = []  # COCO polygons require >= 3 points
        else:
            # Standard Polygon Path
            dist_coeffs = getattr(detection, "distortion_coeffs", None) or None
            dist_model = getattr(detection, "distortion_model", "none") or "none"

            # 1. Segmentation polygon — compute first so bbox can reuse it.
            #    For KB fisheye, edges are curves in distorted space; densely sample
            #    each edge in 3D camera space before projecting to capture the curved boundary.

            if dist_model == "kannala_brandt" and dist_coeffs:
                dense_poly = compute_dense_distorted_polygon(detection, dist_coeffs, dist_model)
            else:
                dense_poly = None

            if dense_poly is not None:
                segmentation = [coord for pt in dense_poly for coord in pt]
            else:
                segmentation = []
                for corner in clipped_corners:
                    segmentation.extend([corner[0], corner[1]])

            # 2. Bbox — for KB fisheye use dense polygon so curved edges are captured;
            #    for other models fall back to the 3D-reconstruction path in compute_bbox.
            if dense_poly is not None:
                dense_arr = np.array(dense_poly)
                x_min = float(np.min(dense_arr[:, 0]))
                y_min = float(np.min(dense_arr[:, 1]))
                x_max = float(np.max(dense_arr[:, 0]))
                y_max = float(np.max(dense_arr[:, 1]))
                bbox = [x_min, y_min, x_max - x_min, y_max - y_min]
            else:
                bbox = compute_bbox(
                    np.array(raw_corners),
                    detection=detection,
                    distortion_coeffs=dist_coeffs,
                    distortion_model=dist_model,
                )

            # 3. Area uses raw corners.
            area = compute_polygon_area(np.array(raw_corners))

        # Keypoints use raw projected coordinates so out-of-image corners are correctly
        # classified as v=1 (labeled, not visible) rather than clipped to the boundary.
        if len(raw_corners) == 4 and width is not None and height is not None:
            corners_array = np.array(raw_corners)
            vis = compute_eval_visibility(
                corners_array, int(width), int(height), self._resolve_margin(detection)
            )
            keypoints = format_coco_keypoints(corners_array, visibility=vis)
            num_keypoints = int(np.sum(vis))
        elif len(raw_corners) == 4:
            keypoints = format_coco_keypoints(np.array(raw_corners))
            num_keypoints = 4
        else:
            keypoints = format_coco_keypoints(np.array(raw_corners))
            num_keypoints = len(raw_corners)

        if detection and detection.keypoints:
            kp_array = np.array(detection.keypoints)
            if width is not None and height is not None:
                kp_vis = compute_eval_visibility(
                    kp_array, int(width), int(height), self._resolve_margin(detection)
                )
            else:
                sentinel_mask = np.all(kp_array == KEYPOINT_SENTINEL, axis=1)
                kp_vis = ~sentinel_mask
            extra_kp = format_coco_keypoints(kp_array, visibility=kp_vis)
            keypoints.extend(extra_kp)
            num_keypoints += int(np.sum(kp_vis))

        # Prepare attributes: dynamic dump excludes COCO-native fields
        if detection:
            attributes = detection.model_dump(mode="json", exclude=_COCO_NATIVE_FIELDS)
            # Fallback: pixel_area may be 0.0 if not computed; use polygon area instead
            if attributes.get("pixel_area") is None:
                attributes["pixel_area"] = area
            # Merge unstructured metadata without overwriting schema fields
            for k, v in detection.metadata.items():
                if k not in attributes:
                    attributes[k] = v
        else:
            attributes = {}

        self.annotations.append(
            {
                "id": annotation_id,
                "image_id": image_id,
                "category_id": category_id,
                "segmentation": [segmentation] if segmentation else [],
                "bbox": bbox,
                "area": area,
                "keypoints": keypoints,
                "num_keypoints": num_keypoints,
                "iscrowd": 0,
                "attributes": attributes,
            }
        )
        self._dirty = True

        return annotation_id

    def save(self, filename: str | None = None) -> Path:
        """Save the COCO annotations to a JSON file."""
        if filename is None:
            filename = self.filename

        self.output_dir.mkdir(parents=True, exist_ok=True)
        output_path = self.output_dir / filename
        logger.info(
            "Saving COCO annotations",
            path=str(output_path),
            images=len(self.images),
            annotations=len(self.annotations),
        )

        if not self._dirty:
            logger.debug("COCOWriter: No new annotations, skipping save")
            return output_path

        coco_data = {
            "images": self.images,
            "annotations": self.annotations,
            "categories": self.categories,
        }

        self._write_atomic(output_path, coco_data)

        self._dirty = False
        return output_path
Functions
__init__
__init__(
    output_dir: Path,
    filename: str = "annotations.json",
    eval_margin_px: int = 0,
) -> None

Initialize the COCO writer.

Source code in src/render_tag/data_io/writers.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def __init__(
    self,
    output_dir: Path,
    filename: str = "annotations.json",
    eval_margin_px: int = 0,
) -> None:
    """Initialize the COCO writer."""
    self.output_dir = output_dir
    self.filename = filename
    self._eval_margin_px = eval_margin_px
    self.images: list[dict] = []
    self.annotations: list[dict] = []
    self.categories: list[dict] = []

    self._category_map: dict[str, int] = {}
    self._next_image_id = 1
    self._next_annotation_id = 1
    self._dirty = False
add_annotation
add_annotation(
    image_id: int,
    category_id: int,
    corners: list[tuple[float, float]],
    width: int | None = None,
    height: int | None = None,
    detection: DetectionRecord | None = None,
) -> int

Add an annotation for a detected tag (optionally clipped).

Source code in src/render_tag/data_io/writers.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
def add_annotation(
    self,
    image_id: int,
    category_id: int,
    corners: list[tuple[float, float]],
    width: int | None = None,
    height: int | None = None,
    detection: DetectionRecord | None = None,
) -> int:
    """Add an annotation for a detected tag (optionally clipped)."""
    if corners is None and detection is not None:
        corners = detection.corners

    if corners is None or len(corners) == 0:
        raise ValueError("Annotation must have at least one point")

    annotation_id = self._next_annotation_id
    self._next_annotation_id += 1

    # Keep raw projected coordinates for keypoint visibility.
    # Clip only for the COCO segmentation polygon (COCO compliance).
    raw_corners = list(corners)
    if width is not None or height is not None:
        clipped_corners = [
            (
                max(0.0, min(float(width or 1e9), c[0])) if c[0] > -999999.0 else c[0],
                max(0.0, min(float(height or 1e9), c[1])) if c[1] > -999999.0 else c[1],
            )
            for c in corners
        ]
    else:
        clipped_corners = raw_corners

    # Handle point annotations (e.g. saddle points)
    if len(raw_corners) < 3:
        # For 1 or 2 points, bbox is small area around points
        x_coords = [c[0] for c in raw_corners]
        y_coords = [c[1] for c in raw_corners]
        min_x, max_x = min(x_coords), max(x_coords)
        min_y, max_y = min(y_coords), max(y_coords)

        # If it's a single point, give it a tiny 1px box for COCO compatibility
        if min_x == max_x:
            min_x -= 0.5
            max_x += 0.5
        if min_y == max_y:
            min_y -= 0.5
            max_y += 0.5

        bbox = [min_x, min_y, max_x - min_x, max_y - min_y]
        area = (max_x - min_x) * (max_y - min_y)
        segmentation = []  # COCO polygons require >= 3 points
    else:
        # Standard Polygon Path
        dist_coeffs = getattr(detection, "distortion_coeffs", None) or None
        dist_model = getattr(detection, "distortion_model", "none") or "none"

        # 1. Segmentation polygon — compute first so bbox can reuse it.
        #    For KB fisheye, edges are curves in distorted space; densely sample
        #    each edge in 3D camera space before projecting to capture the curved boundary.

        if dist_model == "kannala_brandt" and dist_coeffs:
            dense_poly = compute_dense_distorted_polygon(detection, dist_coeffs, dist_model)
        else:
            dense_poly = None

        if dense_poly is not None:
            segmentation = [coord for pt in dense_poly for coord in pt]
        else:
            segmentation = []
            for corner in clipped_corners:
                segmentation.extend([corner[0], corner[1]])

        # 2. Bbox — for KB fisheye use dense polygon so curved edges are captured;
        #    for other models fall back to the 3D-reconstruction path in compute_bbox.
        if dense_poly is not None:
            dense_arr = np.array(dense_poly)
            x_min = float(np.min(dense_arr[:, 0]))
            y_min = float(np.min(dense_arr[:, 1]))
            x_max = float(np.max(dense_arr[:, 0]))
            y_max = float(np.max(dense_arr[:, 1]))
            bbox = [x_min, y_min, x_max - x_min, y_max - y_min]
        else:
            bbox = compute_bbox(
                np.array(raw_corners),
                detection=detection,
                distortion_coeffs=dist_coeffs,
                distortion_model=dist_model,
            )

        # 3. Area uses raw corners.
        area = compute_polygon_area(np.array(raw_corners))

    # Keypoints use raw projected coordinates so out-of-image corners are correctly
    # classified as v=1 (labeled, not visible) rather than clipped to the boundary.
    if len(raw_corners) == 4 and width is not None and height is not None:
        corners_array = np.array(raw_corners)
        vis = compute_eval_visibility(
            corners_array, int(width), int(height), self._resolve_margin(detection)
        )
        keypoints = format_coco_keypoints(corners_array, visibility=vis)
        num_keypoints = int(np.sum(vis))
    elif len(raw_corners) == 4:
        keypoints = format_coco_keypoints(np.array(raw_corners))
        num_keypoints = 4
    else:
        keypoints = format_coco_keypoints(np.array(raw_corners))
        num_keypoints = len(raw_corners)

    if detection and detection.keypoints:
        kp_array = np.array(detection.keypoints)
        if width is not None and height is not None:
            kp_vis = compute_eval_visibility(
                kp_array, int(width), int(height), self._resolve_margin(detection)
            )
        else:
            sentinel_mask = np.all(kp_array == KEYPOINT_SENTINEL, axis=1)
            kp_vis = ~sentinel_mask
        extra_kp = format_coco_keypoints(kp_array, visibility=kp_vis)
        keypoints.extend(extra_kp)
        num_keypoints += int(np.sum(kp_vis))

    # Prepare attributes: dynamic dump excludes COCO-native fields
    if detection:
        attributes = detection.model_dump(mode="json", exclude=_COCO_NATIVE_FIELDS)
        # Fallback: pixel_area may be 0.0 if not computed; use polygon area instead
        if attributes.get("pixel_area") is None:
            attributes["pixel_area"] = area
        # Merge unstructured metadata without overwriting schema fields
        for k, v in detection.metadata.items():
            if k not in attributes:
                attributes[k] = v
    else:
        attributes = {}

    self.annotations.append(
        {
            "id": annotation_id,
            "image_id": image_id,
            "category_id": category_id,
            "segmentation": [segmentation] if segmentation else [],
            "bbox": bbox,
            "area": area,
            "keypoints": keypoints,
            "num_keypoints": num_keypoints,
            "iscrowd": 0,
            "attributes": attributes,
        }
    )
    self._dirty = True

    return annotation_id
add_category
add_category(
    name: str, supercategory: str = "fiducial_marker"
) -> int

Add a category and return its ID.

Source code in src/render_tag/data_io/writers.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def add_category(self, name: str, supercategory: str = "fiducial_marker") -> int:
    """Add a category and return its ID."""
    if name in self._category_map:
        return self._category_map[name]

    cat_id = len(self.categories) + 1
    self.categories.append(
        {
            "id": cat_id,
            "name": name,
            "supercategory": supercategory,
            "keypoints": [
                "tl",
                "tr",
                "br",
                "bl",
            ],  # Industry standard (CW from TL)
            "skeleton": [[1, 2], [2, 3], [3, 4], [4, 1]],  # Edges
        }
    )
    self._category_map[name] = cat_id
    return cat_id
add_image
add_image(file_name: str, width: int, height: int) -> int

Add an image entry and return its ID.

Source code in src/render_tag/data_io/writers.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def add_image(self, file_name: str, width: int, height: int) -> int:
    """Add an image entry and return its ID."""
    image_id = self._next_image_id
    self._next_image_id += 1

    self.images.append(
        {
            "id": image_id,
            "file_name": file_name,
            "width": width,
            "height": height,
        }
    )
    self._dirty = True
    return image_id
save
save(filename: str | None = None) -> Path

Save the COCO annotations to a JSON file.

Source code in src/render_tag/data_io/writers.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
def save(self, filename: str | None = None) -> Path:
    """Save the COCO annotations to a JSON file."""
    if filename is None:
        filename = self.filename

    self.output_dir.mkdir(parents=True, exist_ok=True)
    output_path = self.output_dir / filename
    logger.info(
        "Saving COCO annotations",
        path=str(output_path),
        images=len(self.images),
        annotations=len(self.annotations),
    )

    if not self._dirty:
        logger.debug("COCOWriter: No new annotations, skipping save")
        return output_path

    coco_data = {
        "images": self.images,
        "annotations": self.annotations,
        "categories": self.categories,
    }

    self._write_atomic(output_path, coco_data)

    self._dirty = False
    return output_path

CSVWriter

Writes detection data to a CSV file.

Keeps the file handle open for the writer's lifetime to avoid per-row open/close overhead. Call close() or use as a context manager to flush.

Source code in src/render_tag/data_io/writers.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class CSVWriter:
    """Writes detection data to a CSV file.

    Keeps the file handle open for the writer's lifetime to avoid per-row
    open/close overhead. Call close() or use as a context manager to flush.
    """

    def __init__(self, output_path: Path) -> None:
        """Initialize the CSV writer."""
        self.output_path = output_path
        self._initialized = False
        self._file = None
        self._writer = None

    def _ensure_initialized(self, num_corners: int = 4, num_keypoints: int = 0) -> None:
        """Create the file and write header if not already done."""
        if not self._initialized:
            self.output_path.parent.mkdir(parents=True, exist_ok=True)

            header = DetectionRecord.csv_header(num_corners, num_keypoints)
            self._file = open(self.output_path, "w", newline="")  # noqa: SIM115
            self._writer = csv.writer(self._file)
            self._writer.writerow(header)
            self._file.flush()
            self._initialized = True

    def write_detection(
        self,
        detection: DetectionRecord,
        width: int | None = None,
        height: int | None = None,
    ) -> None:
        """Write a single detection to the CSV file (optionally clipped)."""
        # Calibration records might not have 4 corners
        if detection.record_type == "TAG" and not detection.is_valid():
            return

        self._ensure_initialized(
            num_corners=len(detection.corners),
            num_keypoints=len(detection.keypoints) if detection.keypoints else 0,
        )

        # Delegate CSV formatting to the data record
        row = detection.to_csv_row(width=width, height=height)
        self._writer.writerow(row)

    def write_detections(self, detections: list[DetectionRecord]) -> None:
        """Write multiple detections to the CSV file."""
        for detection in detections:
            self.write_detection(detection)

    def close(self) -> None:
        """Flush and close the underlying file handle."""
        if self._file and not self._file.closed:
            self._file.flush()
            os.fsync(self._file.fileno())
            self._file.close()

    def __del__(self) -> None:
        self.close()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()
Functions
__init__
__init__(output_path: Path) -> None

Initialize the CSV writer.

Source code in src/render_tag/data_io/writers.py
79
80
81
82
83
84
def __init__(self, output_path: Path) -> None:
    """Initialize the CSV writer."""
    self.output_path = output_path
    self._initialized = False
    self._file = None
    self._writer = None
close
close() -> None

Flush and close the underlying file handle.

Source code in src/render_tag/data_io/writers.py
123
124
125
126
127
128
def close(self) -> None:
    """Flush and close the underlying file handle."""
    if self._file and not self._file.closed:
        self._file.flush()
        os.fsync(self._file.fileno())
        self._file.close()
write_detection
write_detection(
    detection: DetectionRecord,
    width: int | None = None,
    height: int | None = None,
) -> None

Write a single detection to the CSV file (optionally clipped).

Source code in src/render_tag/data_io/writers.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def write_detection(
    self,
    detection: DetectionRecord,
    width: int | None = None,
    height: int | None = None,
) -> None:
    """Write a single detection to the CSV file (optionally clipped)."""
    # Calibration records might not have 4 corners
    if detection.record_type == "TAG" and not detection.is_valid():
        return

    self._ensure_initialized(
        num_corners=len(detection.corners),
        num_keypoints=len(detection.keypoints) if detection.keypoints else 0,
    )

    # Delegate CSV formatting to the data record
    row = detection.to_csv_row(width=width, height=height)
    self._writer.writerow(row)
write_detections
write_detections(detections: list[DetectionRecord]) -> None

Write multiple detections to the CSV file.

Source code in src/render_tag/data_io/writers.py
118
119
120
121
def write_detections(self, detections: list[DetectionRecord]) -> None:
    """Write multiple detections to the CSV file."""
    for detection in detections:
        self.write_detection(detection)

ProvenanceWriter

Bases: AtomicWriter

Writer for a unified dataset provenance mapping (image_id -> SceneRecipe).

Source code in src/render_tag/data_io/writers.py
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
class ProvenanceWriter(AtomicWriter):
    """Writer for a unified dataset provenance mapping (image_id -> SceneRecipe)."""

    def __init__(self, output_path: Path) -> None:
        self.output_path = output_path
        self._provenance: dict[str, Any] = {}

    def add_provenance(self, image_id: str, provenance: dict[str, Any] | SceneProvenance) -> None:
        """Add provenance for a single image."""
        model_dump = getattr(provenance, "model_dump", None)
        data = model_dump(mode="json") if callable(model_dump) else provenance
        self._provenance[image_id] = data

    def save(self) -> Path:
        """Save the unified provenance mapping to a JSON file."""
        self.output_path.parent.mkdir(parents=True, exist_ok=True)
        self._write_atomic(self.output_path, self._provenance)
        return self.output_path
Functions
add_provenance
add_provenance(
    image_id: str,
    provenance: dict[str, Any] | SceneProvenance,
) -> None

Add provenance for a single image.

Source code in src/render_tag/data_io/writers.py
460
461
462
463
464
def add_provenance(self, image_id: str, provenance: dict[str, Any] | SceneProvenance) -> None:
    """Add provenance for a single image."""
    model_dump = getattr(provenance, "model_dump", None)
    data = model_dump(mode="json") if callable(model_dump) else provenance
    self._provenance[image_id] = data
save
save() -> Path

Save the unified provenance mapping to a JSON file.

Source code in src/render_tag/data_io/writers.py
466
467
468
469
470
def save(self) -> Path:
    """Save the unified provenance mapping to a JSON file."""
    self.output_path.parent.mkdir(parents=True, exist_ok=True)
    self._write_atomic(self.output_path, self._provenance)
    return self.output_path

RichTruthWriter

Bases: AtomicWriter

Writer for structured JSON 'Data Product' containing all metadata.

Source code in src/render_tag/data_io/writers.py
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
class RichTruthWriter(AtomicWriter):
    """Writer for structured JSON 'Data Product' containing all metadata."""

    SCHEMA_VERSION = "2.0"
    TRUNCATION_POLICY = "ternary_visibility"

    def __init__(self, output_path: Path, eval_margin_px: int = 0) -> None:
        self.output_path = output_path
        self._eval_margin_px = eval_margin_px
        self._effective_margin_px = eval_margin_px
        self._detections: list[dict] = []

    def add_detection(self, detection: DetectionRecord) -> None:
        """Add a detection record, computing per-point visibility flags."""
        record = detection.model_dump(mode="json")

        # Use margin from record if available, fallback to writer default
        margin = detection.eval_margin_px if detection.eval_margin_px > 0 else self._eval_margin_px
        if margin > self._effective_margin_px:
            self._effective_margin_px = margin

        res = detection.resolution
        if res and len(res) == 2 and GEOMETRY_AVAILABLE:
            w, h = int(res[0]), int(res[1])
            corners_arr = np.array(detection.corners)
            corners_vis = compute_eval_visibility_ternary(corners_arr, w, h, margin).tolist()
            record["corners_visibility"] = corners_vis

            kp_vis = None
            if detection.keypoints:
                kp_arr = np.array(detection.keypoints)
                kp_vis = compute_eval_visibility_ternary(kp_arr, w, h, margin).tolist()
                record["keypoints_visibility"] = kp_vis

            # eval_complete is only meaningful for TAG records (4-corner geometry).
            # BOARD records carry per-point keypoints_visibility so callers can
            # filter saddle points individually; a single boolean rollup is not useful.
            if kp_vis is None:
                _VISIBLE = KeypointVisibility.VISIBLE
                record["eval_complete"] = all(v == _VISIBLE for v in corners_vis)

        self._detections.append(record)

    def save(self) -> Path:
        """Save all detections wrapped in a versioned envelope.

        The evaluation_context reflects the actual margin used for visibility tagging.
        When records carry their own eval_margin_px (per-record beats writer default),
        _effective_margin_px is kept up-to-date during add_detection() so this is O(1).
        """
        payload = {
            "version": self.SCHEMA_VERSION,
            "evaluation_context": {
                "photometric_margin_px": self._effective_margin_px,
                "truncation_policy": self.TRUNCATION_POLICY,
            },
            "records": self._detections,
        }
        self.output_path.parent.mkdir(parents=True, exist_ok=True)
        self._write_atomic(self.output_path, payload)
        return self.output_path
Functions
add_detection
add_detection(detection: DetectionRecord) -> None

Add a detection record, computing per-point visibility flags.

Source code in src/render_tag/data_io/writers.py
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def add_detection(self, detection: DetectionRecord) -> None:
    """Add a detection record, computing per-point visibility flags."""
    record = detection.model_dump(mode="json")

    # Use margin from record if available, fallback to writer default
    margin = detection.eval_margin_px if detection.eval_margin_px > 0 else self._eval_margin_px
    if margin > self._effective_margin_px:
        self._effective_margin_px = margin

    res = detection.resolution
    if res and len(res) == 2 and GEOMETRY_AVAILABLE:
        w, h = int(res[0]), int(res[1])
        corners_arr = np.array(detection.corners)
        corners_vis = compute_eval_visibility_ternary(corners_arr, w, h, margin).tolist()
        record["corners_visibility"] = corners_vis

        kp_vis = None
        if detection.keypoints:
            kp_arr = np.array(detection.keypoints)
            kp_vis = compute_eval_visibility_ternary(kp_arr, w, h, margin).tolist()
            record["keypoints_visibility"] = kp_vis

        # eval_complete is only meaningful for TAG records (4-corner geometry).
        # BOARD records carry per-point keypoints_visibility so callers can
        # filter saddle points individually; a single boolean rollup is not useful.
        if kp_vis is None:
            _VISIBLE = KeypointVisibility.VISIBLE
            record["eval_complete"] = all(v == _VISIBLE for v in corners_vis)

    self._detections.append(record)
save
save() -> Path

Save all detections wrapped in a versioned envelope.

The evaluation_context reflects the actual margin used for visibility tagging. When records carry their own eval_margin_px (per-record beats writer default), _effective_margin_px is kept up-to-date during add_detection() so this is O(1).

Source code in src/render_tag/data_io/writers.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def save(self) -> Path:
    """Save all detections wrapped in a versioned envelope.

    The evaluation_context reflects the actual margin used for visibility tagging.
    When records carry their own eval_margin_px (per-record beats writer default),
    _effective_margin_px is kept up-to-date during add_detection() so this is O(1).
    """
    payload = {
        "version": self.SCHEMA_VERSION,
        "evaluation_context": {
            "photometric_margin_px": self._effective_margin_px,
            "truncation_policy": self.TRUNCATION_POLICY,
        },
        "records": self._detections,
    }
    self.output_path.parent.mkdir(parents=True, exist_ok=True)
    self._write_atomic(self.output_path, payload)
    return self.output_path

Functions

merge_all_shards

merge_all_shards(
    output_dir: Path, cleanup: bool = True
) -> None

Merge every per-worker shard file in output_dir into its canonical name.

Source code in src/render_tag/data_io/writers.py
694
695
696
697
698
699
def merge_all_shards(output_dir: Path, cleanup: bool = True) -> None:
    """Merge every per-worker shard file in ``output_dir`` into its canonical name."""
    merge_csv_shards(output_dir, final_filename="ground_truth.csv", cleanup=cleanup)
    merge_coco_shards(output_dir, final_filename="coco_labels.json", cleanup=cleanup)
    merge_rich_truth_shards(output_dir, final_filename="rich_truth.json", cleanup=cleanup)
    merge_provenance_shards(output_dir, final_filename="provenance.json", cleanup=cleanup)

merge_coco_shards

merge_coco_shards(
    output_dir: Path,
    final_filename: str = "coco_labels.json",
    cleanup: bool = False,
)

Merge multiple COCO JSON shards into a single canonical file.

Source code in src/render_tag/data_io/writers.py
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
def merge_coco_shards(
    output_dir: Path, final_filename: str = "coco_labels.json", cleanup: bool = False
):
    """Merge multiple COCO JSON shards into a single canonical file."""
    shard_files = sorted(output_dir.glob("coco_shard_*.json"))
    if not shard_files:
        logger.warning(f"No COCO shards found in {output_dir}")
        return

    master_data = {"images": [], "annotations": [], "categories": []}
    global_image_id_offset = 0
    global_ann_id_offset = 0
    categories_set = False

    for shard_path in shard_files:
        with open(shard_path) as f:
            shard_data = json.load(f)

        if not categories_set:
            master_data["categories"] = shard_data.get("categories", [])
            categories_set = True

        image_id_map = {}
        max_image_id = 0
        for img in shard_data.get("images", []):
            old_id = img["id"]
            new_id = old_id + global_image_id_offset
            image_id_map[old_id] = new_id
            img["id"] = new_id
            master_data["images"].append(img)
            max_image_id = max(max_image_id, old_id)

        max_ann_id = 0
        for ann in shard_data.get("annotations", []):
            old_id = ann["id"]
            new_id = old_id + global_ann_id_offset
            ann["id"] = new_id
            ann["image_id"] = image_id_map.get(ann["image_id"], ann["image_id"])
            master_data["annotations"].append(ann)
            max_ann_id = max(max_ann_id, old_id)

        global_image_id_offset += max_image_id + 1
        global_ann_id_offset += max_ann_id + 1

    final_path = output_dir / final_filename
    _write_json_atomic(final_path, master_data)
    logger.info(f"Merged {len(shard_files)} shards into {final_path}")

    if cleanup:
        for shard_path in shard_files:
            shard_path.unlink()
        logger.info("Cleaned up COCO shard files.")

merge_csv_shards

merge_csv_shards(
    output_dir: Path,
    final_filename: str = "ground_truth.csv",
    cleanup: bool = False,
)

Merge multiple CSV shards into a single canonical file.

Source code in src/render_tag/data_io/writers.py
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
def merge_csv_shards(
    output_dir: Path, final_filename: str = "ground_truth.csv", cleanup: bool = False
):
    """Merge multiple CSV shards into a single canonical file."""
    shard_files = sorted(output_dir.glob("tags_shard_*.csv"))
    if not shard_files:
        logger.warning(f"No CSV shards found in {output_dir}")
        return

    final_path = output_dir / final_filename
    temp_path = final_path.with_suffix(".tmp")
    try:
        with open(temp_path, "w", newline="") as fout:
            writer = csv.writer(fout)
            header_written = False

            for shard_path in shard_files:
                with open(shard_path, newline="") as fin:
                    reader = csv.reader(fin)
                    header = next(reader, None)
                    if not header_written and header:
                        writer.writerow(header)
                        header_written = True

                    for row in reader:
                        writer.writerow(row)

            fout.flush()
            os.fsync(fout.fileno())

        temp_path.rename(final_path)
    except Exception:
        if temp_path.exists():
            temp_path.unlink()
        raise

    logger.info(f"Merged {len(shard_files)} shards into {final_path}")

    if cleanup:
        for shard_path in shard_files:
            shard_path.unlink()
        logger.info("Cleaned up CSV shard files.")

merge_provenance_shards

merge_provenance_shards(
    output_dir: Path,
    final_filename: str = "provenance.json",
    cleanup: bool = False,
)

Merge multiple provenance JSON shards into a single canonical file.

Source code in src/render_tag/data_io/writers.py
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
def merge_provenance_shards(
    output_dir: Path, final_filename: str = "provenance.json", cleanup: bool = False
):
    """Merge multiple provenance JSON shards into a single canonical file."""
    shard_files = sorted(output_dir.glob("provenance_shard_*.json"))
    if not shard_files:
        return

    master_data = {}
    for shard_path in shard_files:
        with open(shard_path) as f:
            shard_data = json.load(f)
            master_data.update(shard_data)

    final_path = output_dir / final_filename
    _write_json_atomic(final_path, master_data)
    logger.info(f"Merged {len(shard_files)} shards into {final_path}")

    if cleanup:
        for shard_path in shard_files:
            shard_path.unlink()
        logger.info("Cleaned up provenance shard files.")

merge_rich_truth_shards

merge_rich_truth_shards(
    output_dir: Path,
    final_filename: str = "rich_truth.json",
    cleanup: bool = False,
)

Merge multiple RichTruth JSON shards into a single canonical file.

Source code in src/render_tag/data_io/writers.py
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
def merge_rich_truth_shards(
    output_dir: Path, final_filename: str = "rich_truth.json", cleanup: bool = False
):
    """Merge multiple RichTruth JSON shards into a single canonical file."""
    shard_files = sorted(output_dir.glob("rich_truth_shard_*.json"))
    if not shard_files:
        logger.warning(f"No RichTruth shards found in {output_dir}")
        return

    all_records: list[dict] = []
    eval_ctx: dict = {}
    max_margin = 0

    for shard_path in shard_files:
        with open(shard_path) as f:
            shard = json.load(f)
        all_records.extend(unwrap_rich_truth(shard))
        if isinstance(shard, dict):
            shard_ctx = shard.get("evaluation_context", {})
            if not eval_ctx:
                eval_ctx = shard_ctx.copy()
            m = shard_ctx.get("photometric_margin_px", 0)
            if m > max_margin:
                max_margin = m

    if eval_ctx:
        if max_margin > 0:
            eval_ctx["photometric_margin_px"] = max_margin
        payload: dict | list[dict] = {
            "version": RichTruthWriter.SCHEMA_VERSION,
            "evaluation_context": eval_ctx,
            "records": all_records,
        }
    else:
        payload = all_records

    final_path = output_dir / final_filename
    _write_json_atomic(final_path, payload)
    logger.info(f"Merged {len(shard_files)} shards into {final_path}")

    if cleanup:
        for shard_path in shard_files:
            shard_path.unlink()
        logger.info("Cleaned up RichTruth shard files.")

Dataset quality verification and telemetry analysis.

render_tag.audit.auditor

Unified auditing and telemetry for render-tag.

Provides data ingestion, geometric/environmental auditing, quality gates, and worker telemetry analysis using Polars.

Classes

AuditDiff

Detects statistical drift between two audit reports.

Source code in src/render_tag/audit/auditor.py
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
class AuditDiff:
    """Detects statistical drift between two audit reports."""

    def __init__(self, report_a: AuditReport, report_b: AuditReport) -> None:
        self.report_a = report_a
        self.report_b = report_b

    def calculate(self) -> dict[str, Any]:
        ga, gb = self.report_a.geometric, self.report_b.geometric
        ia, ib = self.report_a.integrity, self.report_b.integrity
        ea, eb = self.report_a.environmental, self.report_b.environmental
        return {
            "tag_count": gb.tag_count - ga.tag_count,
            "image_count": gb.image_count - ga.image_count,
            "distance_mean_diff": gb.distance.mean - ga.distance.mean,
            "distance_std_diff": gb.distance.std - ga.distance.std,
            "incidence_angle_max_diff": gb.incidence_angle.max - ga.incidence_angle.max,
            "incidence_angle_mean_diff": gb.incidence_angle.mean - ga.incidence_angle.mean,
            "impossible_poses_diff": ib.impossible_poses - ia.impossible_poses,
            "corrupted_frames_diff": ib.corrupted_frames - ia.corrupted_frames,
            "orphaned_tags_diff": ib.orphaned_tags - ia.orphaned_tags,
            "chirality_failures_diff": ib.chirality_failures - ia.chirality_failures,
            "orientation_failures_diff": ib.orientation_failures - ia.orientation_failures,
            "margin_violations_diff": ib.margin_violations - ia.margin_violations,
            "lighting_intensity_mean_diff": (
                eb.lighting_intensity.mean - ea.lighting_intensity.mean
            ),
            "score_diff": self.report_b.score - self.report_a.score,
        }

AuditReport

Bases: BaseModel

Complete audit report for a dataset.

Source code in src/render_tag/audit/auditor.py
89
90
91
92
93
94
95
96
97
class AuditReport(BaseModel):
    """Complete audit report for a dataset."""

    dataset_name: str
    timestamp: str
    geometric: GeometricAudit
    environmental: EnvironmentalAudit
    integrity: IntegrityAudit
    score: float = 0.0

AuditResult

Bases: BaseModel

Final result of an audit run, including gates.

Source code in src/render_tag/audit/auditor.py
117
118
119
120
121
122
123
class AuditResult(BaseModel):
    """Final result of an audit run, including gates."""

    report: AuditReport
    gate_passed: bool = True
    gate_failures: list[str] = Field(default_factory=list)
    quarantined: bool = False

DatasetAuditor

Orchestrates the full audit of a dataset.

Coordinates geometric, environmental, and integrity checks to produce a comprehensive quality report and gate status.

Attributes:

Name Type Description
dataset_path

Path to the dataset root.

reader

Helper for high-speed data ingestion.

Source code in src/render_tag/audit/auditor.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
class DatasetAuditor:
    """Orchestrates the full audit of a dataset.

    Coordinates geometric, environmental, and integrity checks to produce
    a comprehensive quality report and gate status.

    Attributes:
        dataset_path: Path to the dataset root.
        reader: Helper for high-speed data ingestion.
    """

    def __init__(self, dataset_path: Path) -> None:
        """Initialize the DatasetAuditor.

        Args:
            dataset_path: Directory containing the images and metadata.
        """
        self.dataset_path = dataset_path
        self.reader = DatasetReader(dataset_path)

    def run_audit(self, gate_config: QualityGateConfig | None = None) -> AuditResult:
        """Execute all audit passes and evaluate quality gates.

        Args:
            gate_config: Optional configuration for metric-based pass/fail gates.

        Returns:
            An AuditResult containing the full report and gate status.
        """
        df = self.reader.load_rich_detections()
        raw_records, eval_ctx = self.reader.load_raw_records()

        def get_stats(col):
            if col not in df.columns:
                return DistributionStats(min=0, max=0, mean=0, std=0, median=0)
            s = df[col]
            return DistributionStats(
                min=float(s.min() or 0),
                max=float(s.max() or 0),
                mean=float(s.mean() or 0),
                std=float(s.std() or 0),
                median=float(s.median() or 0),
            )

        geom = GeometricAudit(
            distance=get_stats("distance"),
            incidence_angle=get_stats("angle_of_incidence"),
            ppm=get_stats("ppm"),
            tag_count=len(df),
            image_count=df["image_id"].n_unique() if "image_id" in df.columns else 0,
        )
        env = EnvironmentalAudit(lighting_intensity=get_stats("lighting_intensity"))

        chirality_failures = self._run_chirality_check(raw_records)
        orientation_failures, dict_orient_error = self._run_anchor_check(raw_records)
        margin_px = int(eval_ctx.get("photometric_margin_px", 0))
        margin_violations = self._run_margin_check(raw_records, margin_px)

        integrity = IntegrityAudit(
            impossible_poses=int(df.filter(pl.col("distance") < 0).height)
            if "distance" in df.columns
            else 0,
            chirality_failures=chirality_failures,
            orientation_failures=orientation_failures,
            dictionary_orientation_error=dict_orient_error,
            margin_violations=margin_violations,
        )

        report = AuditReport(
            dataset_name=self.dataset_path.name,
            timestamp=datetime.now(UTC).isoformat(),
            geometric=geom,
            environmental=env,
            integrity=integrity,
            score=self._calculate_score(geom, env, integrity),
        )

        gate_passed = True
        gate_failures = []
        if gate_config:
            # Simple gate logic for tests
            pass

        quarantined = chirality_failures > 0 or orientation_failures > 0 or margin_violations > 0

        if chirality_failures > 0:
            gate_failures.append(f"CHIRALITY: {chirality_failures} tag(s) have wrong winding order")
            gate_passed = False
        if orientation_failures > 0:
            msg = f"ORIENTATION: {orientation_failures} tag(s) fail 3D anchor projection"
            if dict_orient_error:
                msg += " [DictionaryOrientationError: texture is 180° out of phase]"
            gate_failures.append(msg)
            gate_passed = False
        if margin_violations > 0:
            gate_failures.append(
                f"MARGIN: {margin_violations} corner(s) marked VISIBLE inside the "
                f"{margin_px}px eval margin — projection bug detected"
            )
            gate_passed = False

        return AuditResult(
            report=report,
            gate_passed=gate_passed,
            gate_failures=gate_failures,
            quarantined=quarantined,
        )

    def _calculate_score(
        self, geom: GeometricAudit, env: EnvironmentalAudit, integrity: IntegrityAudit
    ) -> float:
        """Calculate a heuristic quality score (0-100)."""
        if geom.tag_count == 0:
            return 0.0
        score = 100.0
        score -= integrity.impossible_poses * 10
        if geom.tag_count > 0:
            score -= (integrity.chirality_failures / geom.tag_count) * 50
            score -= (integrity.orientation_failures / geom.tag_count) * 50
        if geom.incidence_angle.max < 45:
            score -= 20
        if geom.distance.max - geom.distance.min < 1.0:
            score -= 10
        return float(max(0.0, min(100.0, score)))

    def _run_chirality_check(self, records: list[dict[str, Any]]) -> int:
        """Phase 1: Chirality invariant test via diagonal cross product.

        For a CW quad [P0=TL, P1=TR, P2=BR, P3=BL] in Y-down image space:
            A = P0→P2,  B = P1→P3
            cross = Ax*By - Ay*Bx  must be > 0

        Note: A 180° index rotation produces the same positive cross product,
        so this test catches mirror flips but NOT 180° orientation errors.
        That is handled by _run_anchor_check.

        Returns:
            Number of TAG records that fail the chirality invariant.
        """
        failures = 0
        for rec in records:
            if rec.get("record_type") != "TAG":
                continue
            corners = rec.get("corners")
            if not corners or len(corners) < 4:
                continue
            p0, p1, p2, p3 = corners[0], corners[1], corners[2], corners[3]
            ax = p2[0] - p0[0]
            ay = p2[1] - p0[1]
            bx = p3[0] - p1[0]
            by = p3[1] - p1[1]
            if ax * by - ay * bx <= 0:
                failures += 1
                logger.warning(
                    "Chirality failure",
                    image_id=rec.get("image_id"),
                    tag_id=rec.get("tag_id"),
                )
        return failures

    def _run_anchor_check(self, records: list[dict[str, Any]]) -> tuple[int, bool]:
        """Phase 2: 3D-to-2D projection anchor test.

        Projects the TL corner of the tag (using the stored pose) and measures
        its distance to annotated corners[0]. Sub-pixel accuracy is expected.

        If the projected TL lands near corners[2] (BR) instead, this is a
        DictionaryOrientationError: the texture is 180° out of phase with the
        3D geometry.

        Returns:
            (failure_count, dictionary_orientation_error)
        """
        _PASS_THRESHOLD = 0.5  # sub-pixel, px
        _FAIL_THRESHOLD = 10.0  # significant error, px

        failures = 0
        dict_orient_error = False

        for rec in records:
            if rec.get("record_type") != "TAG":
                continue
            corners = rec.get("corners")
            pos = rec.get("position")
            quat_wxyz = rec.get("rotation_quaternion")
            k_mat = rec.get("k_matrix")
            tag_size_mm = rec.get("tag_size_mm")
            dist_model = rec.get("distortion_model", "none")
            dist_coeffs = rec.get("distortion_coeffs", [])

            if (
                not corners
                or pos is None
                or quat_wxyz is None
                or k_mat is None
                or tag_size_mm is None
            ):
                continue
            if len(corners) < 4:
                continue

            half = float(tag_size_mm) / 2000.0  # mm → m, half-size

            # Center-Origin, Y-down convention: TL = (-half, -half, 0)
            local_tl = np.array([-half, -half, 0.0])
            R = quaternion_wxyz_to_matrix(quat_wxyz)
            t = np.array(pos, dtype=float)
            p_cam = R @ local_tl + t

            if p_cam[2] <= 0:
                continue  # Behind camera

            k = np.array(k_mat, dtype=float)

            # Apply lens distortion if model is specified
            x_norm = p_cam[0] / p_cam[2]
            y_norm = p_cam[1] / p_cam[2]
            xd, yd = apply_distortion_by_model(
                np.array([x_norm]), np.array([y_norm]), dist_coeffs, dist_model
            )

            x_proj = k[0, 0] * xd[0] + k[0, 2]
            y_proj = k[1, 1] * yd[0] + k[1, 2]

            c0 = corners[0]
            dist0 = float(np.hypot(x_proj - c0[0], y_proj - c0[1]))

            if dist0 > _PASS_THRESHOLD:
                failures += 1
                logger.warning(
                    "Anchor projection failure",
                    image_id=rec.get("image_id"),
                    tag_id=rec.get("tag_id"),
                    dist_to_corner0=round(dist0, 2),
                )

                if dist0 > _FAIL_THRESHOLD:
                    c2 = corners[2]
                    dist2 = float(np.hypot(x_proj - c2[0], y_proj - c2[1]))
                    if dist2 < _PASS_THRESHOLD:
                        dict_orient_error = True
                        logger.error(
                            "DictionaryOrientationError: texture 180° out of phase",
                            image_id=rec.get("image_id"),
                            tag_id=rec.get("tag_id"),
                            dist_to_corner0=round(dist0, 2),
                            dist_to_corner2=round(dist2, 2),
                        )

        return failures, dict_orient_error

    def _run_margin_check(self, records: list[dict[str, Any]], margin_px: int) -> int:
        """Sanity check: no corner marked VISIBLE (v=2) should be inside the margin zone.

        A v=2 flag on a geometrically marginal corner means the visibility
        computation diverged from the geometric truth — catching this early
        prevents corrupted evaluation labels from reaching MLOps teams.

        Returns:
            Number of corners where the stored v=2 flag contradicts the geometry.
        """
        if margin_px == 0:
            return 0

        violations = 0
        for rec in records:
            corners = rec.get("corners")
            vis_flags = rec.get("corners_visibility")
            resolution = rec.get("resolution")
            if not corners or not vis_flags or not resolution or len(resolution) < 2:
                continue
            w, h = int(resolution[0]), int(resolution[1])
            rec_violations = sum(
                1
                for (x, y), v in zip(corners, vis_flags, strict=True)
                if v == KeypointVisibility.VISIBLE
                and (x < margin_px or x >= w - margin_px or y < margin_px or y >= h - margin_px)
            )
            if rec_violations:
                violations += rec_violations
                logger.error(
                    "Margin violation: corners marked VISIBLE inside eval_margin_px zone",
                    image_id=rec.get("image_id"),
                    tag_id=rec.get("tag_id"),
                    violating_corners=rec_violations,
                    margin_px=margin_px,
                )
        return violations
Functions
__init__
__init__(dataset_path: Path) -> None

Initialize the DatasetAuditor.

Parameters:

Name Type Description Default
dataset_path Path

Directory containing the images and metadata.

required
Source code in src/render_tag/audit/auditor.py
233
234
235
236
237
238
239
240
def __init__(self, dataset_path: Path) -> None:
    """Initialize the DatasetAuditor.

    Args:
        dataset_path: Directory containing the images and metadata.
    """
    self.dataset_path = dataset_path
    self.reader = DatasetReader(dataset_path)
run_audit
run_audit(
    gate_config: QualityGateConfig | None = None,
) -> AuditResult

Execute all audit passes and evaluate quality gates.

Parameters:

Name Type Description Default
gate_config QualityGateConfig | None

Optional configuration for metric-based pass/fail gates.

None

Returns:

Type Description
AuditResult

An AuditResult containing the full report and gate status.

Source code in src/render_tag/audit/auditor.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
def run_audit(self, gate_config: QualityGateConfig | None = None) -> AuditResult:
    """Execute all audit passes and evaluate quality gates.

    Args:
        gate_config: Optional configuration for metric-based pass/fail gates.

    Returns:
        An AuditResult containing the full report and gate status.
    """
    df = self.reader.load_rich_detections()
    raw_records, eval_ctx = self.reader.load_raw_records()

    def get_stats(col):
        if col not in df.columns:
            return DistributionStats(min=0, max=0, mean=0, std=0, median=0)
        s = df[col]
        return DistributionStats(
            min=float(s.min() or 0),
            max=float(s.max() or 0),
            mean=float(s.mean() or 0),
            std=float(s.std() or 0),
            median=float(s.median() or 0),
        )

    geom = GeometricAudit(
        distance=get_stats("distance"),
        incidence_angle=get_stats("angle_of_incidence"),
        ppm=get_stats("ppm"),
        tag_count=len(df),
        image_count=df["image_id"].n_unique() if "image_id" in df.columns else 0,
    )
    env = EnvironmentalAudit(lighting_intensity=get_stats("lighting_intensity"))

    chirality_failures = self._run_chirality_check(raw_records)
    orientation_failures, dict_orient_error = self._run_anchor_check(raw_records)
    margin_px = int(eval_ctx.get("photometric_margin_px", 0))
    margin_violations = self._run_margin_check(raw_records, margin_px)

    integrity = IntegrityAudit(
        impossible_poses=int(df.filter(pl.col("distance") < 0).height)
        if "distance" in df.columns
        else 0,
        chirality_failures=chirality_failures,
        orientation_failures=orientation_failures,
        dictionary_orientation_error=dict_orient_error,
        margin_violations=margin_violations,
    )

    report = AuditReport(
        dataset_name=self.dataset_path.name,
        timestamp=datetime.now(UTC).isoformat(),
        geometric=geom,
        environmental=env,
        integrity=integrity,
        score=self._calculate_score(geom, env, integrity),
    )

    gate_passed = True
    gate_failures = []
    if gate_config:
        # Simple gate logic for tests
        pass

    quarantined = chirality_failures > 0 or orientation_failures > 0 or margin_violations > 0

    if chirality_failures > 0:
        gate_failures.append(f"CHIRALITY: {chirality_failures} tag(s) have wrong winding order")
        gate_passed = False
    if orientation_failures > 0:
        msg = f"ORIENTATION: {orientation_failures} tag(s) fail 3D anchor projection"
        if dict_orient_error:
            msg += " [DictionaryOrientationError: texture is 180° out of phase]"
        gate_failures.append(msg)
        gate_passed = False
    if margin_violations > 0:
        gate_failures.append(
            f"MARGIN: {margin_violations} corner(s) marked VISIBLE inside the "
            f"{margin_px}px eval margin — projection bug detected"
        )
        gate_passed = False

    return AuditResult(
        report=report,
        gate_passed=gate_passed,
        gate_failures=gate_failures,
        quarantined=quarantined,
    )

DatasetReader

Handles high-speed ingestion of datasets.

Source code in src/render_tag/audit/auditor.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
class DatasetReader:
    """Handles high-speed ingestion of datasets."""

    def __init__(self, dataset_path: Path) -> None:
        self.dataset_path = dataset_path
        self.tags_csv = dataset_path / "tags.csv"

    def load_rich_detections(self) -> pl.DataFrame:
        rich_path = self.dataset_path / "rich_truth.json"
        if pl is None:
            raise ImportError("polars required")
        if not rich_path.exists():
            if not self.tags_csv.exists():
                raise FileNotFoundError(f"tags.csv not found in {self.dataset_path}")
            return pl.read_csv(self.tags_csv)
        with open(rich_path) as f:
            raw = json.load(f)
        return pl.DataFrame(unwrap_rich_truth(raw))

    def load_raw_records(self) -> tuple[list[dict[str, Any]], dict[str, Any]]:
        """Load raw JSON records and the evaluation_context header.

        Returns:
            (records, evaluation_context) where evaluation_context is an empty
            dict for v1 (legacy bare-array) files.
        """
        rich_path = self.dataset_path / "rich_truth.json"
        if not rich_path.exists():
            return [], {}
        with open(rich_path) as f:
            raw = json.load(f)
        if isinstance(raw, dict):
            return raw.get("records", []), raw.get("evaluation_context", {})
        return raw, {}
Functions
load_raw_records
load_raw_records() -> tuple[
    list[dict[str, Any]], dict[str, Any]
]

Load raw JSON records and the evaluation_context header.

Returns:

Type Description
list[dict[str, Any]]

(records, evaluation_context) where evaluation_context is an empty

dict[str, Any]

dict for v1 (legacy bare-array) files.

Source code in src/render_tag/audit/auditor.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def load_raw_records(self) -> tuple[list[dict[str, Any]], dict[str, Any]]:
    """Load raw JSON records and the evaluation_context header.

    Returns:
        (records, evaluation_context) where evaluation_context is an empty
        dict for v1 (legacy bare-array) files.
    """
    rich_path = self.dataset_path / "rich_truth.json"
    if not rich_path.exists():
        return [], {}
    with open(rich_path) as f:
        raw = json.load(f)
    if isinstance(raw, dict):
        return raw.get("records", []), raw.get("evaluation_context", {})
    return raw, {}

DictionaryOrientationError

Bases: ValueError

Raised when the texture payload is 180° out of phase with the 3D geometry.

The projected 3D TL anchor lands on annotated corners[2] (BR) instead of corners[0] (TL), which proves the UV mapping or index convention is inverted.

Source code in src/render_tag/audit/auditor.py
39
40
41
42
43
44
class DictionaryOrientationError(ValueError):
    """Raised when the texture payload is 180° out of phase with the 3D geometry.

    The projected 3D TL anchor lands on annotated corners[2] (BR) instead of
    corners[0] (TL), which proves the UV mapping or index convention is inverted.
    """

DistributionStats

Bases: BaseModel

Statistical distribution summary.

Source code in src/render_tag/audit/auditor.py
50
51
52
53
54
55
56
57
class DistributionStats(BaseModel):
    """Statistical distribution summary."""

    min: float
    max: float
    mean: float
    std: float
    median: float

EnvironmentalAudit

Bases: BaseModel

Audit results for environmental variance.

Source code in src/render_tag/audit/auditor.py
70
71
72
73
74
class EnvironmentalAudit(BaseModel):
    """Audit results for environmental variance."""

    lighting_intensity: DistributionStats
    contrast: DistributionStats | None = None

GateRule

Bases: BaseModel

A single rule for a quality gate.

Source code in src/render_tag/audit/auditor.py
100
101
102
103
104
105
106
107
108
class GateRule(BaseModel):
    """A single rule for a quality gate."""

    metric: str
    min: float | None = None
    max: float | None = None
    critical: bool = True
    warning_msg: str | None = None
    error_msg: str | None = None

GeometricAudit

Bases: BaseModel

Audit results for geometric coverage.

Source code in src/render_tag/audit/auditor.py
60
61
62
63
64
65
66
67
class GeometricAudit(BaseModel):
    """Audit results for geometric coverage."""

    distance: DistributionStats
    incidence_angle: DistributionStats
    ppm: DistributionStats | None = None
    tag_count: int
    image_count: int

IntegrityAudit

Bases: BaseModel

Audit results for data integrity.

Source code in src/render_tag/audit/auditor.py
77
78
79
80
81
82
83
84
85
86
class IntegrityAudit(BaseModel):
    """Audit results for data integrity."""

    orphaned_tags: int = 0
    impossible_poses: int = 0
    corrupted_frames: int = 0
    chirality_failures: int = 0
    orientation_failures: int = 0
    dictionary_orientation_error: bool = False
    margin_violations: int = 0

QualityGateConfig

Bases: BaseModel

Configuration for quality gates.

Source code in src/render_tag/audit/auditor.py
111
112
113
114
class QualityGateConfig(BaseModel):
    """Configuration for quality gates."""

    rules: list[GateRule] = Field(default_factory=list)

TelemetryAuditor

Collects and analyzes worker telemetry using Polars.

Source code in src/render_tag/audit/auditor.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
class TelemetryAuditor:
    """Collects and analyzes worker telemetry using Polars."""

    MAX_RECORDS = 10_000

    def __init__(self):
        self.records: deque[dict[str, Any]] = deque(maxlen=self.MAX_RECORDS)

    def add_entry(self, worker_id: str, telemetry: Telemetry, event_type: str = "heartbeat"):
        """Adds a telemetry record."""
        entry = {
            "timestamp": datetime.now(),
            "worker_id": worker_id,
            "event_type": event_type,
            "vram_used_mb": telemetry.vram_used_mb,
            "vram_total_mb": telemetry.vram_total_mb,
            "ram_used_mb": telemetry.ram_used_mb,
            "cpu_usage": telemetry.cpu_usage_percent,
            "uptime": telemetry.uptime_seconds,
            "state_hash": telemetry.state_hash,
        }
        self.records.append(entry)

    def get_dataframe(self) -> pl.DataFrame:
        if not self.records or pl is None:
            return pl.DataFrame() if pl else None
        return pl.DataFrame(self.records)

    def save_csv(self, output_path: Path):
        df = self.get_dataframe()
        if df is not None and not df.is_empty():
            output_path.parent.mkdir(parents=True, exist_ok=True)
            df.write_csv(output_path)
            logger.info("Telemetry saved", path=str(output_path))

    def analyze_throughput(self) -> dict[str, Any]:
        """Calculates throughput statistics."""
        from typing import cast

        df = self.get_dataframe()
        if df is None or df.is_empty():
            return {}
        min_ts = cast(datetime, df["timestamp"].min())
        max_ts = cast(datetime, df["timestamp"].max())
        duration = (max_ts - min_ts).total_seconds()
        total_events = len(df)
        return {
            "total_duration_sec": duration,
            "event_count": total_events,
            "avg_vram_mb": float(df["vram_used_mb"].mean() or 0),
            "max_vram_mb": float(df["vram_used_mb"].max() or 0),
        }
Functions
add_entry
add_entry(
    worker_id: str,
    telemetry: Telemetry,
    event_type: str = "heartbeat",
)

Adds a telemetry record.

Source code in src/render_tag/audit/auditor.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def add_entry(self, worker_id: str, telemetry: Telemetry, event_type: str = "heartbeat"):
    """Adds a telemetry record."""
    entry = {
        "timestamp": datetime.now(),
        "worker_id": worker_id,
        "event_type": event_type,
        "vram_used_mb": telemetry.vram_used_mb,
        "vram_total_mb": telemetry.vram_total_mb,
        "ram_used_mb": telemetry.ram_used_mb,
        "cpu_usage": telemetry.cpu_usage_percent,
        "uptime": telemetry.uptime_seconds,
        "state_hash": telemetry.state_hash,
    }
    self.records.append(entry)
analyze_throughput
analyze_throughput() -> dict[str, Any]

Calculates throughput statistics.

Source code in src/render_tag/audit/auditor.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def analyze_throughput(self) -> dict[str, Any]:
    """Calculates throughput statistics."""
    from typing import cast

    df = self.get_dataframe()
    if df is None or df.is_empty():
        return {}
    min_ts = cast(datetime, df["timestamp"].min())
    max_ts = cast(datetime, df["timestamp"].max())
    duration = (max_ts - min_ts).total_seconds()
    total_events = len(df)
    return {
        "total_duration_sec": duration,
        "event_count": total_events,
        "avg_vram_mb": float(df["vram_used_mb"].mean() or 0),
        "max_vram_mb": float(df["vram_used_mb"].max() or 0),
    }

Functions

The 3D rendering driver that runs inside the Blender environment.

render_tag.backend.engine

Unified rendering engine for render-tag.

Combines the RenderFacade (Blender abstraction) and the execution loop into a single, high-performance module.

Classes

CyclesRenderStrategy

Configures the high-fidelity Cycles path tracer.

Source code in src/render_tag/backend/engine.py
81
82
83
84
85
class CyclesRenderStrategy:
    """Configures the high-fidelity Cycles path tracer."""

    def configure(self) -> None:
        bridge.bpy.context.scene.render.engine = "CYCLES"

EeveeRenderStrategy

Configures the real-time Eevee engine.

Source code in src/render_tag/backend/engine.py
88
89
90
91
92
93
94
95
class EeveeRenderStrategy:
    """Configures the real-time Eevee engine."""

    def configure(self) -> None:
        try:
            bridge.bpy.context.scene.render.engine = "BLENDER_EEVEE_NEXT"
        except Exception:
            bridge.bpy.context.scene.render.engine = "BLENDER_EEVEE"

RenderContext dataclass

Groups all necessary state for executing a single render task.

Source code in src/render_tag/backend/engine.py
57
58
59
60
61
62
63
64
65
66
67
68
69
@dataclass
class RenderContext:
    """Groups all necessary state for executing a single render task."""

    output_dir: Path
    renderer_mode: str
    csv_writer: CSVWriter
    coco_writer: COCOWriter
    rich_writer: RichTruthWriter
    provenance_writer: ProvenanceWriter
    global_seed: int
    logger: Any = None
    skip_visibility: bool = False

RenderEngineStrategy

Bases: Protocol

Protocol for rendering engine configuration strategies.

Source code in src/render_tag/backend/engine.py
72
73
74
75
76
77
78
@runtime_checkable
class RenderEngineStrategy(Protocol):
    """Protocol for rendering engine configuration strategies."""

    def configure(self) -> None:
        """Configure the Blender scene for this specific engine."""
        ...
Functions
configure
configure() -> None

Configure the Blender scene for this specific engine.

Source code in src/render_tag/backend/engine.py
76
77
78
def configure(self) -> None:
    """Configure the Blender scene for this specific engine."""
    ...

RenderFacade

High-level interface for rendering fiducial tag scenes.

Attributes:

Name Type Description
renderer_mode

The rendering engine to use ('cycles', 'eevee', 'workbench').

logger

Logger for tracking rendering progress.

config

Detailed configuration for the renderer.

Source code in src/render_tag/backend/engine.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
class RenderFacade:
    """High-level interface for rendering fiducial tag scenes.

    Attributes:
        renderer_mode: The rendering engine to use ('cycles', 'eevee', 'workbench').
        logger: Logger for tracking rendering progress.
        config: Detailed configuration for the renderer.
    """

    def __init__(
        self,
        renderer_mode: str = "cycles",
        logger: logging.Logger | None = None,
        config: RendererConfig | None = None,
    ):
        """Initialize the RenderFacade.

        Args:
            renderer_mode: Name of the engine to use.
            logger: Optional pre-configured logger.
            config: Optional renderer configuration presets.
        """
        # Ensure bridge is stabilized before using it
        if not bridge.bpy or not bridge.bproc:
            bridge.stabilize()

        self.renderer_mode = renderer_mode
        self.logger = logger or get_logger(__name__)
        self.config = config
        self._engine_strategies = {
            "cycles": CyclesRenderStrategy(),
            "eevee": EeveeRenderStrategy(),
            "workbench": WorkbenchRenderStrategy(),
        }
        self._configure_engine()

    def _configure_engine(self):
        """Standardizes engine-specific settings."""
        strategy = self._engine_strategies.get(self.renderer_mode, CyclesRenderStrategy())
        strategy.configure()

        if self.renderer_mode == "cycles" and self.config:
            # Apply CV-Safe Adaptive Sampling
            bridge.bproc.renderer.set_noise_threshold(self.config.noise_threshold)
            bridge.bproc.renderer.set_max_amount_of_samples(self.config.max_samples)

            # Apply Denoising with Albedo/Normal guidance
            if self.config.enable_denoising:
                self.logger.info(f"Enabling {self.config.denoiser_type} denoiser")
                bridge.bproc.renderer.set_denoiser(self.config.denoiser_type)

                if self.config.denoiser_type == "INTEL":
                    # Intel OIDN performs much better with Albedo and Normal guidance
                    # for preserving high-frequency edges like tag corners.
                    bridge.bproc.renderer.enable_diffuse_color_output()
                    bridge.bproc.renderer.enable_normals_output()

            # Apply CV-Safe Light Paths
            bridge.bproc.renderer.set_light_bounces(
                diffuse_bounces=self.config.diffuse_bounces,
                glossy_bounces=self.config.glossy_bounces,
                transmission_bounces=self.config.transmission_bounces,
                transparent_max_bounces=self.config.transparent_bounces,
                volume_bounces=0,  # Volumes are expensive and usually not needed for tags
                max_bounces=self.config.total_bounces,
            )
            # BlenderProc doesn't wrap caustics, set via bpy directly
            bridge.bpy.context.scene.cycles.caustics_reflective = self.config.enable_caustics
            bridge.bpy.context.scene.cycles.caustics_refractive = self.config.enable_caustics

        # Plumb CPU thread budget from environment if available
        # Set in render_tag.core.utils.get_subprocess_env
        thread_budget = os.environ.get("BLENDER_CPU_THREADS")
        if thread_budget and thread_budget.isdigit():
            t = int(thread_budget)
            self.logger.info(f"Setting Blender render threads to {t}")
            bridge.bpy.context.scene.render.threads_mode = "FIXED"
            bridge.bpy.context.scene.render.threads = t

    def reset_volatile_state(self):
        """Clears objects from the scene but keeps heavy environment assets."""
        global_pool.release_all()
        bridge.bproc.utility.reset_keyframes()

    def setup_world(self, world_recipe: WorldRecipe):
        """Resolves world parameters and environment state."""
        hdri_path = world_recipe.background_hdri
        if hdri_path:
            setup_background(Path(hdri_path))

        # Handle Background Texture Plane
        texture_path = world_recipe.texture_path
        if texture_path:
            # Use managed background plane from pool
            bg_plane = global_pool.get_background_plane()
            setup_floor_material(
                bg_plane,
                texture_path=texture_path,
                scale=world_recipe.texture_scale,
                rotation=world_recipe.texture_rotation,
            )

        setup_lighting(world_recipe.lights)

    def spawn_objects(self, object_recipes: list[ObjectRecipe]) -> list[Any]:
        """Creates subjects (tags, boards, etc.) using decoupled AssetBuilders.

        This method implements Scene Graph Deduplication: if a BOARD with a
        composite texture is present, it suppresses the generation of individual
        TAG objects that would otherwise cause Z-fighting.
        """
        tag_objects = []

        # Check if any BOARD with a texture exists in the recipe
        has_composite_board = any(
            obj.type == "BOARD" and obj.texture_path for obj in object_recipes
        )

        for obj_recipe in object_recipes:
            # Suppress individual TAGs if a composite BOARD is handling the rendering
            if obj_recipe.type == "TAG" and has_composite_board:
                continue

            try:
                assets = default_registry.build_object(obj_recipe)
                tag_objects.extend(assets)
            except KeyError as e:
                self.logger.critical(f"FATAL: Missing builder for subject type: {obj_recipe.type}")
                raise e

        return tag_objects

    def render_camera(self, camera_recipe: CameraRecipe) -> dict[str, Any]:
        """Configures a camera and renders the image.

        Args:
            camera_recipe: Resolved recipe containing pose, intrinsics, and sensor settings.

        Returns:
            A dictionary containing the 'img' (ndarray) and 'segmap' (metadata).
        """
        pose_matrix = bridge.np.array(camera_recipe.transform_matrix)
        bridge.bproc.camera.add_camera_pose(pose_matrix, frame=0)
        setup_sensor_dynamics(pose_matrix, camera_recipe.sensor_dynamics)

        # Apply intrinsics (Resolution, FOV, etc.)
        set_camera_intrinsics(camera_recipe)

        cam_data = bridge.bpy.context.scene.camera.data
        fstop = camera_recipe.fstop
        if fstop:
            cam_data.dof.use_dof = True
            cam_data.dof.aperture_fstop = fstop
            focus_distance = camera_recipe.focus_distance
            if focus_distance:
                cam_data.dof.focus_distance = focus_distance
        else:
            cam_data.dof.use_dof = False

        if bridge.bpy.context.scene.render.engine not in (
            "BLENDER_EEVEE",
            "BLENDER_EEVEE_NEXT",
            "BLENDER_WORKBENCH",
        ):
            bridge.bproc.renderer.enable_segmentation_output(default_values={"category_id": 0})

        self.logger.info("Starting BlenderProc render call...")
        data = bridge.bproc.renderer.render()
        self.logger.info("BlenderProc render call completed.")
        img = data["colors"][0]
        segmap = data.get("segmentation", [None])[0]

        intrinsics = camera_recipe.intrinsics
        if intrinsics.fov_spherical is not None and intrinsics.distortion_coeffs:
            self.logger.info("Applying post-render spherical equidistant warp...")
            warp_maps = compute_spherical_distortion_maps(
                intrinsics.k_matrix,
                intrinsics.resolution,
                intrinsics.distortion_coeffs,
                intrinsics.fov_spherical,
                intrinsics.resolution_spherical,
            )
            img = remap_image(img, *warp_maps)
            if segmap is not None:
                segmap = remap_image(segmap, *warp_maps, nearest_neighbor=True)
        elif intrinsics.k_matrix_overscan is not None and intrinsics.distortion_coeffs:
            self.logger.info("Applying post-render lens distortion warp...")
            warp_maps = compute_distortion_maps(
                intrinsics.k_matrix_overscan,
                intrinsics.k_matrix,
                intrinsics.resolution,
                intrinsics.distortion_coeffs,
                intrinsics.distortion_model,
            )
            img = remap_image(img, *warp_maps)
            if segmap is not None:
                segmap = remap_image(segmap, *warp_maps, nearest_neighbor=True)

        img = apply_sensor_dr(img, camera_recipe.dynamic_range_db)
        img = apply_tone_mapping(img, camera_recipe.tone_mapping)

        if camera_recipe.sensor_noise:
            img = apply_parametric_noise(img, camera_recipe.sensor_noise)

        return {"img": img, "segmap": segmap}
Functions
__init__
__init__(
    renderer_mode: str = "cycles",
    logger: Logger | None = None,
    config: RendererConfig | None = None,
)

Initialize the RenderFacade.

Parameters:

Name Type Description Default
renderer_mode str

Name of the engine to use.

'cycles'
logger Logger | None

Optional pre-configured logger.

None
config RendererConfig | None

Optional renderer configuration presets.

None
Source code in src/render_tag/backend/engine.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def __init__(
    self,
    renderer_mode: str = "cycles",
    logger: logging.Logger | None = None,
    config: RendererConfig | None = None,
):
    """Initialize the RenderFacade.

    Args:
        renderer_mode: Name of the engine to use.
        logger: Optional pre-configured logger.
        config: Optional renderer configuration presets.
    """
    # Ensure bridge is stabilized before using it
    if not bridge.bpy or not bridge.bproc:
        bridge.stabilize()

    self.renderer_mode = renderer_mode
    self.logger = logger or get_logger(__name__)
    self.config = config
    self._engine_strategies = {
        "cycles": CyclesRenderStrategy(),
        "eevee": EeveeRenderStrategy(),
        "workbench": WorkbenchRenderStrategy(),
    }
    self._configure_engine()
render_camera
render_camera(
    camera_recipe: CameraRecipe,
) -> dict[str, Any]

Configures a camera and renders the image.

Parameters:

Name Type Description Default
camera_recipe CameraRecipe

Resolved recipe containing pose, intrinsics, and sensor settings.

required

Returns:

Type Description
dict[str, Any]

A dictionary containing the 'img' (ndarray) and 'segmap' (metadata).

Source code in src/render_tag/backend/engine.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def render_camera(self, camera_recipe: CameraRecipe) -> dict[str, Any]:
    """Configures a camera and renders the image.

    Args:
        camera_recipe: Resolved recipe containing pose, intrinsics, and sensor settings.

    Returns:
        A dictionary containing the 'img' (ndarray) and 'segmap' (metadata).
    """
    pose_matrix = bridge.np.array(camera_recipe.transform_matrix)
    bridge.bproc.camera.add_camera_pose(pose_matrix, frame=0)
    setup_sensor_dynamics(pose_matrix, camera_recipe.sensor_dynamics)

    # Apply intrinsics (Resolution, FOV, etc.)
    set_camera_intrinsics(camera_recipe)

    cam_data = bridge.bpy.context.scene.camera.data
    fstop = camera_recipe.fstop
    if fstop:
        cam_data.dof.use_dof = True
        cam_data.dof.aperture_fstop = fstop
        focus_distance = camera_recipe.focus_distance
        if focus_distance:
            cam_data.dof.focus_distance = focus_distance
    else:
        cam_data.dof.use_dof = False

    if bridge.bpy.context.scene.render.engine not in (
        "BLENDER_EEVEE",
        "BLENDER_EEVEE_NEXT",
        "BLENDER_WORKBENCH",
    ):
        bridge.bproc.renderer.enable_segmentation_output(default_values={"category_id": 0})

    self.logger.info("Starting BlenderProc render call...")
    data = bridge.bproc.renderer.render()
    self.logger.info("BlenderProc render call completed.")
    img = data["colors"][0]
    segmap = data.get("segmentation", [None])[0]

    intrinsics = camera_recipe.intrinsics
    if intrinsics.fov_spherical is not None and intrinsics.distortion_coeffs:
        self.logger.info("Applying post-render spherical equidistant warp...")
        warp_maps = compute_spherical_distortion_maps(
            intrinsics.k_matrix,
            intrinsics.resolution,
            intrinsics.distortion_coeffs,
            intrinsics.fov_spherical,
            intrinsics.resolution_spherical,
        )
        img = remap_image(img, *warp_maps)
        if segmap is not None:
            segmap = remap_image(segmap, *warp_maps, nearest_neighbor=True)
    elif intrinsics.k_matrix_overscan is not None and intrinsics.distortion_coeffs:
        self.logger.info("Applying post-render lens distortion warp...")
        warp_maps = compute_distortion_maps(
            intrinsics.k_matrix_overscan,
            intrinsics.k_matrix,
            intrinsics.resolution,
            intrinsics.distortion_coeffs,
            intrinsics.distortion_model,
        )
        img = remap_image(img, *warp_maps)
        if segmap is not None:
            segmap = remap_image(segmap, *warp_maps, nearest_neighbor=True)

    img = apply_sensor_dr(img, camera_recipe.dynamic_range_db)
    img = apply_tone_mapping(img, camera_recipe.tone_mapping)

    if camera_recipe.sensor_noise:
        img = apply_parametric_noise(img, camera_recipe.sensor_noise)

    return {"img": img, "segmap": segmap}
reset_volatile_state
reset_volatile_state()

Clears objects from the scene but keeps heavy environment assets.

Source code in src/render_tag/backend/engine.py
184
185
186
187
def reset_volatile_state(self):
    """Clears objects from the scene but keeps heavy environment assets."""
    global_pool.release_all()
    bridge.bproc.utility.reset_keyframes()
setup_world
setup_world(world_recipe: WorldRecipe)

Resolves world parameters and environment state.

Source code in src/render_tag/backend/engine.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def setup_world(self, world_recipe: WorldRecipe):
    """Resolves world parameters and environment state."""
    hdri_path = world_recipe.background_hdri
    if hdri_path:
        setup_background(Path(hdri_path))

    # Handle Background Texture Plane
    texture_path = world_recipe.texture_path
    if texture_path:
        # Use managed background plane from pool
        bg_plane = global_pool.get_background_plane()
        setup_floor_material(
            bg_plane,
            texture_path=texture_path,
            scale=world_recipe.texture_scale,
            rotation=world_recipe.texture_rotation,
        )

    setup_lighting(world_recipe.lights)
spawn_objects
spawn_objects(
    object_recipes: list[ObjectRecipe],
) -> list[Any]

Creates subjects (tags, boards, etc.) using decoupled AssetBuilders.

This method implements Scene Graph Deduplication: if a BOARD with a composite texture is present, it suppresses the generation of individual TAG objects that would otherwise cause Z-fighting.

Source code in src/render_tag/backend/engine.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def spawn_objects(self, object_recipes: list[ObjectRecipe]) -> list[Any]:
    """Creates subjects (tags, boards, etc.) using decoupled AssetBuilders.

    This method implements Scene Graph Deduplication: if a BOARD with a
    composite texture is present, it suppresses the generation of individual
    TAG objects that would otherwise cause Z-fighting.
    """
    tag_objects = []

    # Check if any BOARD with a texture exists in the recipe
    has_composite_board = any(
        obj.type == "BOARD" and obj.texture_path for obj in object_recipes
    )

    for obj_recipe in object_recipes:
        # Suppress individual TAGs if a composite BOARD is handling the rendering
        if obj_recipe.type == "TAG" and has_composite_board:
            continue

        try:
            assets = default_registry.build_object(obj_recipe)
            tag_objects.extend(assets)
        except KeyError as e:
            self.logger.critical(f"FATAL: Missing builder for subject type: {obj_recipe.type}")
            raise e

    return tag_objects

WorkbenchRenderStrategy

Configures the fast Workbench engine for previews.

Source code in src/render_tag/backend/engine.py
 98
 99
100
101
102
class WorkbenchRenderStrategy:
    """Configures the fast Workbench engine for previews."""

    def configure(self) -> None:
        bridge.bpy.context.scene.render.engine = "BLENDER_WORKBENCH"

Functions

execute_recipe

execute_recipe(
    recipe: SceneRecipe,
    ctx: RenderContext,
    seed: int | None = None,
) -> None

Execute a single scene recipe using the RenderContext.

Parameters:

Name Type Description Default
recipe SceneRecipe

The fully resolved scene description.

required
ctx RenderContext

Execution context (writers, output paths, etc.).

required
seed int | None

Optional overrides for reproducibility.

None
Source code in src/render_tag/backend/engine.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
def execute_recipe(
    recipe: SceneRecipe,
    ctx: RenderContext,
    seed: int | None = None,
) -> None:
    """Execute a single scene recipe using the RenderContext.

    Args:
        recipe: The fully resolved scene description.
        ctx: Execution context (writers, output paths, etc.).
        seed: Optional overrides for reproducibility.
    """
    scene_idx = recipe.scene_id

    # 1. Setup Context-Aware Logger
    base_logger = ctx.logger or logger
    scene_logger = base_logger.bind(
        scene_id=scene_idx, seed=seed if seed is not None else scene_idx
    )
    scene_logger.info(f"--- Executing Scene {scene_idx} ---")

    # 2. Setup Scene
    renderer, tag_objects = _setup_scene(recipe, ctx, scene_logger)

    cam_recipes = recipe.cameras
    res = cam_recipes[0].intrinsics.resolution

    provenance = {
        "git_hash": get_git_hash(),
        "timestamp": datetime.now(UTC).isoformat(),
        "recipe_snapshot": recipe.model_dump(),
        "seeds": {
            "global_seed": ctx.global_seed,
            "scene_seed": recipe.random_seed,
        },
    }

    # 3. Render Cameras and Save Data
    for cam_idx, cam_recipe in enumerate(cam_recipes):
        coco_img_id, image_name = _render_camera_and_save(
            renderer, cam_idx, cam_recipe, recipe, ctx, scene_logger, provenance, res
        )

        _extract_and_save_ground_truth(
            tag_objects, image_name, coco_img_id, res, ctx, scene_logger, cam_recipe
        )

        scene_logger.info(
            f"Scene {scene_idx} progress: {cam_idx + 1}/{len(cam_recipes)}",
            extra={
                "log_type": "progress",
                "payload": {
                    "current": cam_idx + 1,
                    "total": len(cam_recipes),
                    "scene_id": scene_idx,
                },
            },
        )

    scene_logger.info(f"✓ Rendered scene {scene_idx}")

render_tag.backend.worker_server

Classes

ZmqBackendServer

ZeroMQ-based rendering server for Blender workers with Dual-Socket Architecture.

Source code in src/render_tag/backend/worker_server.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
class ZmqBackendServer:
    """ZeroMQ-based rendering server for Blender workers with Dual-Socket Architecture."""

    def __init__(
        self,
        port: int = 5555,
        shard_id: str = "main",
        seed: int = 42,
        job_spec: "JobSpec | None" = None,
        mgmt_port: int | None = None,
        memory_limit_mb: int | None = None,
        **mocks,
    ):
        self.port, self.shard_id, self.seed = port, shard_id, seed
        self.job_spec = job_spec
        self.mgmt_port = mgmt_port or (port + 100)
        self.memory_limit_mb = memory_limit_mb

        self.context = zmq.Context()

        # 1. Task Socket (REP) - Handles RENDER, INIT, SHUTDOWN
        self.task_socket = self.context.socket(zmq.REP)
        self.task_socket.setsockopt(zmq.LINGER, 0)
        self.task_socket.bind(f"tcp://127.0.0.1:{port}")

        # 2. Management Socket (REP) - Handles STATUS (Telemetry)
        self.mgmt_socket = self.context.socket(zmq.REP)
        self.mgmt_socket.setsockopt(zmq.LINGER, 0)
        self.mgmt_socket.bind(f"tcp://127.0.0.1:{self.mgmt_port}")

        # Ensure dependencies are stabilized for rendering
        bridge.stabilize(mocks.get("bproc_mock"), mocks.get("bpy_mock"), mocks.get("math_mock"))

        self.running = False
        self._lock = threading.Lock()

        # Shared State (Protected by self._lock)
        self.status = WorkerStatus.IDLE
        self.renders_completed = 0
        self.current_scene_id = None
        self.start_time = time.time()
        self.assets_loaded, self.parameters = [], {}
        self.current_output_dir, self.writers = None, {}
        self.bproc_initialized = False

        # 3. Telemetry Emitter (PUB) - Created lazily in run() to avoid
        #    binding ZMQ sockets that may never be used (e.g. in tests).
        self.telemetry_port = self.mgmt_port + 1000
        self.worker_id = f"worker-{shard_id}"
        self.emitter: TelemetryEmitter | None = None

    def _check_memory(self) -> bool:
        """Checks current memory usage and triggers shutdown if limit exceeded.

        Returns:
            True if memory is within limits, False if exceeded.
        """
        if self.memory_limit_mb is None:
            return True

        import gc
        import os

        import psutil

        # Trigger GC before final measurement to avoid premature restarts
        gc.collect()

        process = psutil.Process(os.getpid())
        current_usage_mb = process.memory_info().rss / (1024 * 1024)

        if current_usage_mb > self.memory_limit_mb:
            logger.warning(
                f"Memory limit exceeded: {current_usage_mb:.1f}MB > {self.memory_limit_mb}MB. "
                "Initiating preventative restart."
            )
            with self._lock:
                self.status = WorkerStatus.RESOURCE_LIMIT_EXCEEDED
            self.running = False
            return False

        return True

    def get_telemetry(self) -> Telemetry:
        """Returns current worker health and state metrics (Thread Safe)."""
        import os

        import psutil

        process = psutil.Process(os.getpid())

        with self._lock:
            # Blender specific metrics
            obj_count = 0
            if bridge.bpy:
                import contextlib

                with contextlib.suppress(Exception):
                    obj_count = len(bridge.bpy.data.objects)

            return Telemetry(
                status=self.status,
                vram_used_mb=0.0,  # Placeholder for GPU VRAM
                vram_total_mb=0.0,
                cpu_usage_percent=process.cpu_percent(),
                state_hash=calculate_state_hash(self.assets_loaded, self.parameters),
                uptime_seconds=time.time() - self.start_time,
                ram_used_mb=process.memory_info().rss / (1024 * 1024),
                object_count=obj_count,
                active_scene_id=self.current_scene_id,
            )

    def _setup_writers(self, output_dir: Path, shard_id: str):
        """Initialize data writers for the current shard."""
        if self.current_output_dir == output_dir:
            return
        self.current_output_dir = output_dir
        output_dir.mkdir(parents=True, exist_ok=True)
        self.writers = {
            "csv": CSVWriter(output_dir / f"tags_shard_{shard_id}.csv"),
            "coco": COCOWriter(
                output_dir,
                filename=f"coco_shard_{shard_id}.json",
                eval_margin_px=(
                    self.job_spec.scene_config.camera.eval_margin_px
                    if self.job_spec is not None
                    else 0
                ),
            ),
            "rich": RichTruthWriter(
                output_dir / f"rich_truth_shard_{shard_id}.json",
                eval_margin_px=(
                    self.job_spec.scene_config.camera.eval_margin_px
                    if self.job_spec is not None
                    else 0
                ),
            ),
            "provenance": ProvenanceWriter(output_dir / f"provenance_shard_{shard_id}.json"),
        }
        self.writers["csv"]._ensure_initialized()

    def _finalize_writers(self):
        """Flush and save data from active writers."""
        if not self.writers:
            return

        logger.info(f"Finalizing data for {len(self.writers)} writers...")
        for name, w in self.writers.items():
            save_func = getattr(w, "save", None)
            if callable(save_func):
                try:
                    save_func()
                    logger.debug(f"Saved {name} writer")
                except Exception as e:
                    logger.error(f"Error saving {name} writer: {e}")
        # Clear writers so we don't save multiple times if not needed
        self.writers = {}

    def stop(self):
        """Stops the server loop and closes sockets."""
        self.running = False
        if hasattr(self, "emitter") and self.emitter is not None:
            self.emitter.stop()
        try:
            if hasattr(self, "task_socket") and self.task_socket:
                self.task_socket.close(linger=0)
            if hasattr(self, "mgmt_socket") and self.mgmt_socket:
                self.mgmt_socket.close(linger=0)
            if hasattr(self, "context") and self.context:
                self.context.term()
        except Exception:
            pass

    def _mgmt_loop(self):
        """Dedicated thread for handling management requests (Heartbeats)."""
        logger.info(f"Management thread started on port {self.mgmt_port}")
        poller = zmq.Poller()
        poller.register(self.mgmt_socket, zmq.POLLIN)

        while self.running:
            try:
                # Periodic memory check
                if not self._check_memory():
                    break

                socks = dict(poller.poll(500))
                if self.mgmt_socket in socks:
                    msg = self.mgmt_socket.recv_string()
                    cmd = Command.model_validate_json(msg)

                    if cmd.command_type == CommandType.STATUS:
                        resp = self._on_status(cmd)
                        self.mgmt_socket.send_string(resp.model_dump_json())
                    else:
                        resp = Response(
                            status=ResponseStatus.FAILURE,
                            request_id=cmd.request_id,
                            message=f"Command {cmd.command_type} not supported on MGMT channel",
                        )
                        self.mgmt_socket.send_string(resp.model_dump_json())
            except (zmq.ZMQError, zmq.ContextTerminated):
                break
            except Exception as e:
                logger.error(f"MGMT loop error: {e}")

    def run(self, max_renders: int | None = None):
        """Starts the server loop."""
        self.running = True
        logger.info(f"Worker task server started on port {self.port}")

        # Start management thread
        mgmt_thread = threading.Thread(target=self._mgmt_loop, daemon=True)
        mgmt_thread.start()

        # Start async telemetry emitter
        self.emitter = TelemetryEmitter(
            worker_id=self.worker_id, port=self.telemetry_port, server_ref=self
        )
        self.emitter.start()

        while self.running:
            try:
                # Periodic memory check
                if not self._check_memory():
                    break

                if not self.task_socket.poll(1000):
                    continue

                msg = self.task_socket.recv_string()
                cmd = Command.model_validate_json(msg)

                # Execute command (Blocks the task loop, but mgmt thread stays alive)
                resp = self._handle_command(cmd)

                at_limit = False
                with self._lock:
                    at_limit = bool(max_renders and self.renders_completed >= max_renders)
                    if at_limit:
                        self.status = WorkerStatus.FINISHED

                if at_limit:
                    logger.info("Worker reached max renders limit. Finalizing data...")
                    self._finalize_writers()

                self.task_socket.send_string(resp.model_dump_json())

                if at_limit or cmd.command_type == CommandType.SHUTDOWN:
                    time.sleep(0.1)
                    self.running = False
            except (zmq.ZMQError, zmq.ContextTerminated):
                if not self.running:
                    break
                logger.error("ZMQ error in task loop", exc_info=True)
            except Exception as e:
                logger.error(f"Task loop error: {e}", exc_info=True)

        self._finalize_writers()

    def _handle_command(self, cmd: Command) -> Response:
        """Dispatch a command to the appropriate handler."""
        handlers = {
            CommandType.STATUS: self._on_status,
            CommandType.INIT: self._on_init,
            CommandType.RENDER: self._on_render,
            CommandType.RESET: self._on_reset,
            CommandType.SHUTDOWN: self._on_shutdown,
        }
        handler = handlers.get(cmd.command_type)
        if not handler:
            return Response(
                status=ResponseStatus.FAILURE, request_id=cmd.request_id, message="Unknown command"
            )

        try:
            return handler(cmd)
        except Exception as e:
            logger.exception(f"Command {cmd.command_type} failed: {e}")
            return Response(
                status=ResponseStatus.FAILURE, request_id=cmd.request_id, message=str(e)
            )

    def _on_status(self, cmd: Command) -> Response:
        """Handle STATUS command (Health check)."""
        return Response(
            status=ResponseStatus.SUCCESS,
            request_id=cmd.request_id,
            message="Alive",
            data=self.get_telemetry().model_dump(),
        )

    def _on_init(self, cmd: Command) -> Response:
        """Handle INIT command (Load assets/settings)."""
        with self._lock:
            if not self.bproc_initialized and bridge.bproc:
                bridge.bproc.init()
                bridge.bproc.clean_up()
                self.bproc_initialized = True

            payload = cmd.payload or {}
            for path in payload.get("assets", []):
                if path not in self.assets_loaded:
                    p = Path(path)
                    if p.exists() and p.suffix.lower() in [".exr", ".hdr"] and bridge.bpy:
                        setup_background(p)
                    self.assets_loaded.append(path)
            self.parameters.update(payload.get("parameters", {}))

            return Response(
                status=ResponseStatus.SUCCESS,
                request_id=cmd.request_id,
                message=f"{len(self.assets_loaded)} assets resident",
            )

    def _on_render(self, cmd: Command) -> Response:
        """Handle RENDER command (Execute recipe)."""
        with self._lock:
            self.status = WorkerStatus.BUSY

        try:
            p = cmd.payload
            if not isinstance(p, dict):
                logger.error(f"FATAL: Command payload is not a dict: {type(p)} value={p}")
                return Response(
                    status=ResponseStatus.FAILURE,
                    request_id=cmd.request_id,
                    message=f"Payload error: {type(p)}",
                )

            raw_recipe, output_dir = p.get("recipe"), Path(p.get("output_dir", "."))
            shard_id = p.get("shard_id", self.shard_id)

            # Move-Left: Validate that the incoming recipe matches our rigid schema
            if not isinstance(raw_recipe, SceneRecipe):
                try:
                    recipe = SceneRecipe.model_validate(raw_recipe)
                except Exception as e:
                    logger.error(f"FATAL: SceneRecipe validation failed: {e}")
                    raise e
            else:
                recipe = raw_recipe

            self._setup_writers(output_dir, shard_id)
            scene_id = recipe.scene_id
            with self._lock:
                self.current_scene_id = scene_id

            render_seed = derive_seed(self.seed, "render", scene_id)

            ctx = RenderContext(
                output_dir=output_dir,
                renderer_mode=p.get("renderer_mode", "cycles"),
                csv_writer=self.writers["csv"],
                coco_writer=self.writers["coco"],
                rich_writer=self.writers["rich"],
                provenance_writer=self.writers["provenance"],
                global_seed=self.seed,
                skip_visibility=p.get("skip_visibility", False),
            )

            execute_recipe(recipe, ctx=ctx, seed=render_seed)

            with self._lock:
                # Post-render memory check
                if not self._check_memory():
                    return Response(
                        status=ResponseStatus.FAILURE,
                        request_id=cmd.request_id,
                        message="RESOURCE_LIMIT_EXCEEDED: Memory limit exceeded after render.",
                    )

                self.renders_completed += 1
                self.status = WorkerStatus.IDLE

            return Response(
                status=ResponseStatus.SUCCESS,
                request_id=cmd.request_id,
                message=f"Rendered scene {scene_id}",
            )
        finally:
            with self._lock:
                if self.status == WorkerStatus.BUSY:
                    self.status = WorkerStatus.IDLE

    def _on_reset(self, cmd: Command) -> Response:
        """Handle RESET command (Clear scene)."""
        global_pool.release_all()
        return Response(status=ResponseStatus.SUCCESS, request_id=cmd.request_id, message="Reset")

    def _on_shutdown(self, cmd: Command) -> Response:
        """Handle SHUTDOWN command (Stop server)."""
        self.running = False
        return Response(
            status=ResponseStatus.SUCCESS, request_id=cmd.request_id, message="Shutdown"
        )

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.stop()
Functions
get_telemetry
get_telemetry() -> Telemetry

Returns current worker health and state metrics (Thread Safe).

Source code in src/render_tag/backend/worker_server.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def get_telemetry(self) -> Telemetry:
    """Returns current worker health and state metrics (Thread Safe)."""
    import os

    import psutil

    process = psutil.Process(os.getpid())

    with self._lock:
        # Blender specific metrics
        obj_count = 0
        if bridge.bpy:
            import contextlib

            with contextlib.suppress(Exception):
                obj_count = len(bridge.bpy.data.objects)

        return Telemetry(
            status=self.status,
            vram_used_mb=0.0,  # Placeholder for GPU VRAM
            vram_total_mb=0.0,
            cpu_usage_percent=process.cpu_percent(),
            state_hash=calculate_state_hash(self.assets_loaded, self.parameters),
            uptime_seconds=time.time() - self.start_time,
            ram_used_mb=process.memory_info().rss / (1024 * 1024),
            object_count=obj_count,
            active_scene_id=self.current_scene_id,
        )
run
run(max_renders: int | None = None)

Starts the server loop.

Source code in src/render_tag/backend/worker_server.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
def run(self, max_renders: int | None = None):
    """Starts the server loop."""
    self.running = True
    logger.info(f"Worker task server started on port {self.port}")

    # Start management thread
    mgmt_thread = threading.Thread(target=self._mgmt_loop, daemon=True)
    mgmt_thread.start()

    # Start async telemetry emitter
    self.emitter = TelemetryEmitter(
        worker_id=self.worker_id, port=self.telemetry_port, server_ref=self
    )
    self.emitter.start()

    while self.running:
        try:
            # Periodic memory check
            if not self._check_memory():
                break

            if not self.task_socket.poll(1000):
                continue

            msg = self.task_socket.recv_string()
            cmd = Command.model_validate_json(msg)

            # Execute command (Blocks the task loop, but mgmt thread stays alive)
            resp = self._handle_command(cmd)

            at_limit = False
            with self._lock:
                at_limit = bool(max_renders and self.renders_completed >= max_renders)
                if at_limit:
                    self.status = WorkerStatus.FINISHED

            if at_limit:
                logger.info("Worker reached max renders limit. Finalizing data...")
                self._finalize_writers()

            self.task_socket.send_string(resp.model_dump_json())

            if at_limit or cmd.command_type == CommandType.SHUTDOWN:
                time.sleep(0.1)
                self.running = False
        except (zmq.ZMQError, zmq.ContextTerminated):
            if not self.running:
                break
            logger.error("ZMQ error in task loop", exc_info=True)
        except Exception as e:
            logger.error(f"Task loop error: {e}", exc_info=True)

    self._finalize_writers()
stop
stop()

Stops the server loop and closes sockets.

Source code in src/render_tag/backend/worker_server.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def stop(self):
    """Stops the server loop and closes sockets."""
    self.running = False
    if hasattr(self, "emitter") and self.emitter is not None:
        self.emitter.stop()
    try:
        if hasattr(self, "task_socket") and self.task_socket:
            self.task_socket.close(linger=0)
        if hasattr(self, "mgmt_socket") and self.mgmt_socket:
            self.mgmt_socket.close(linger=0)
        if hasattr(self, "context") and self.context:
            self.context.term()
    except Exception:
        pass

Functions