Skip to content

How to Run Concurrent Detection

Detector supports two modes controlled by max_concurrent_frames at construction.

!!! note "API Constraint" Currently, max_concurrent_frames is only exposed through the DetectorBuilder API. Using the standard locus.Detector() constructor will default to a single-frame pool.

max_concurrent_frames Behaviour
1 (default) Sequential. detect_concurrent processes frames one at a time.
> 1 Parallel. detect_concurrent processes up to N frames simultaneously via Rayon.

threads (intra-frame parallelism) and max_concurrent_frames (inter-frame parallelism) are independent — set both to match your workload.

Building a concurrent detector

import locus

# DetectorBuilder is required to set max_concurrent_frames
detector = (
    locus.DetectorBuilder()
    .with_family(locus.TagFamily.AprilTag36h11)
    .with_threads(4)               # Rayon threads per frame
    .with_max_concurrent_frames(8) # up to 8 frames in parallel
    .build()
)

Single-frame detection (default)

import numpy as np

frame = np.zeros((480, 640), dtype=np.uint8)
result = detector.detect(frame)
print(f"{len(result.ids)} tags")

Single-frame detect supports debug telemetry; detect_concurrent does not.

Batch detection

frames: list[np.ndarray] = [...]   # list of (H, W) uint8 arrays

results = detector.detect_concurrent(
    frames,
    intrinsics=locus.CameraIntrinsics(fx=600.0, fy=600.0, cx=320.0, cy=240.0),
    tag_size=0.166,
)

for i, r in enumerate(results):
    print(f"frame {i}: {len(r.ids)} tags")

The GIL is released for the entire Rayon section. Results are returned in the same order as frames.

Limitations of detect_concurrent: - rejected_corners and rejected_error_rates are always empty. - Debug telemetry is not available. Use detect(debug_telemetry=True) for diagnostic work.

Thread-pool pattern (one detector per thread)

For workloads where each thread processes a continuous stream (e.g. one camera per thread), create one Detector per thread and call detect on each frame:

import threading
import locus

def make_detector() -> locus.Detector:
    return locus.DetectorBuilder().with_family(locus.TagFamily.AprilTag36h11).build()

_local = threading.local()

def detect_one(frame):
    if not hasattr(_local, "detector"):
        _local.detector = make_detector()
    return _local.detector.detect(frame)

This avoids the overhead of detect_concurrent's pool management for per-frame streaming workloads.

Choosing max_concurrent_frames

A good starting point is the number of CPU cores, or the expected batch size if it is smaller:

import os
detector = (
    locus.DetectorBuilder()
    .with_max_concurrent_frames(os.cpu_count())
    .build()
)

If more frames arrive simultaneously than the pool size, Locus allocates temporary overflow contexts (~200 KB each) rather than blocking. This is acceptable for burst traffic but suboptimal under sustained over-subscription.