Clothing Detection
Clothing-based identification is a key capability in GPSR (General Purpose Service Robot) and EGPSR (Enhanced General Purpose Service Robot) tasks, where the robot may be asked to "find the person wearing a red jacket" or "count how many people are in white shirts." These instructions rely on natural human language and require a system that can understand both visual content and semantic queries.
To address this, the clothing detection module uses a combination of:
- YOLOv8: For person detection and bounding box generation.
- Moondream: A vision-language model to interpret clothing from cropped images and respond to prompts.
- ROS 2 services: To expose the functionality in a modular and scalable manner.
Overview
- Detect all people in the camera frame using YOLOv8.
- Crop the image around each detected person using their bounding box.
- Formulate a language prompt describing the target clothing, e.g., “Is the person wearing a red shirt?”
- Query Moondream with the cropped image and prompt.
- Interpret the result (binary: 1= match,0= no match).
- Return the total count of people matching the description.
Service Callback: count_by_color_callback
This is the entry point when a client wants to count people based on clothing color/type.
def count_by_color_callback(self, request, response):
    """Callback to count people wearing a specific color and clothing."""
    self.get_logger().info("Executing service Count By Color")
    if self.image is None:
        response.success = False
        response.count = 0
        return response
    frame = self.image
    self.output_image = frame.copy()
    clothing = request.clothing
    color = request.color
    self.get_detections(frame, 0)
    count = 0
    for person in self.people:
        x1, y1, x2, y2 = person["bbox"]
        prompt = f"Reply only with 1 if the person is wearing a {color} {clothing}. Otherwise, reply only with 0."
        status, response_q = self.moondream_crop_query(
            prompt, [float(y1), float(x1), float(y2), float(x2)]
        )
        if status:
            response_clean = response_q.strip()
            if response_clean == "1":
                count += 1
                self.get_logger().info(f"Person {count} is wearing a {color} {clothing}.")
            elif response_clean != "0":
                self.get_logger().warn(f"Unexpected response: {response_clean}")
    response.success = True
    response.count = count
    self.get_logger().info(f"People wearing a {color} {clothing}: {count}")
    return response
- The prompt is generated dynamically based on user input (color,clothing), allowing any combination without hardcoding.
- The decision threshold is binary (1 or 0) to simplify interpretation and reduce ambiguity.
- Only results with exact match ("1") are counted.
Bounding Box Normalization & Moondream Query
Since Moondream expects input in normalized coordinates, we extract and normalize the bounding box for each person before sending the request:
def moondream_crop_query(self, prompt: str, bbox: list[float]) -> tuple[int, str]:
    """Makes a query of the current image using Moondream."""
    self.get_logger().info(f"Querying image with prompt: {prompt}")
    height, width = self.image.shape[:2]
    ymin = bbox[0] / height
    xmin = bbox[1] / width
    ymax = bbox[2] / height
    xmax = bbox[3] / width
    request = CropQuery.Request()
    request.query = prompt
    request.ymin = ymin
    request.xmin = xmin
    request.ymax = ymax
    request.xmax = xmax
    future = self.moondream_client.call_async(request)
    future = self.wait_for_future(future, 15)
    result = future.result()
    if result is None:
        self.get_logger().error("Moondream service returned None.")
        return 0, "0"
    if result.success:
        self.get_logger().info(f"Moondream result: {result.result}")
        return 1, result.result
- call_async()allows non-blocking queries, necessary when multiple services are used in parallel.
- A custom utility (wait_for_future) ensures that the query completes before proceeding.
Asynchronous Handling with wait_for_future
This helper function waits for the result of an asynchronous Moondream call:
def wait_for_future(self, future, timeout=5):
    start_time = time.time()
    while future is None and (time.time() - start_time) < timeout:
        pass
    if future is None:
        return False
    while not future.done() and (time.time() - start_time) < timeout:
        pass
    return future
Why it's needed:
- ROS 2 service clients operate asynchronously by default.
- 
Without this, you would either: 
- 
Proceed without a result. 
- Block indefinitely.
- This keeps the node responsive while ensuring results are actually used.
Multithreading & Reentrant Callback Group
Clothing queries might be long-running due to model latency. To prevent blocking other services or the image stream, the node uses:
And in the service:
self.count_by_color_service = self.create_service(
    CountByColor,
    COUNT_BY_COLOR_TOPIC,
    self.count_by_color_callback,
    callback_group=self.callback_group,
)
In main():
Key Benefits:
- Multiple requests can be processed concurrently.
- Other parts of the node (e.g., image acquisition, pose queries) remain functional.
- Reentrant group allows the same callback to run in parallel if triggered twice.