Process camera images
While the Intrinsic platform provides perception skills such as estimate_pose,
your solution might need custom perception capabilities.
This tutorial shows you how to create a skill that gets an image from a camera,
process it with OpenCV, and output results.
It builds a barcode scanning skill as an example.
To find the complete example, navigate to the bottom of the page.
Setup
You need a solution with a camera. The solution needs to be deployed, and you need a development environment with its target solution set to that deployment. Follow the guides for setting up your development environment and making your first skill if you don't have that yet.
Our example skill is only going to implement the execute interface, and not
the predict interface.
The predict interface returns what a skill would do if executed, without
actually executing it.
It can be used with skills that have multiple valid outputs.
For example, there may be multiple ways to move a robot to the same pose.
This skill is different.
It is either going to see barcodes or it's not, and it needs to execute to
be able to get an image to detect them.
Because we're not using the predict interface, your solution's execution mode
must be set to "Full" and not "Draft".
In "Draft" mode only the predict interface is called, so the skill never
does anything in "Draft" mode.
Create a new skill
Create a new Python or C++ skill in your development environment, and give it the ID
com.example.scan_barcodes. When prompted for the folder name, give
skills/scan_barcodes.
- Python
Python type annotations are used in this guide.
If using Python, then add this import to the top of scan_barcodes.py
from typing import List
Parameters and Outputs
The parameters and outputs of the skill need to be decided upon.
They are defined by protobuf messages.
Two messages are defined below: ScanBarcodesParams and ScanBarcodesResult.
The skill doesn't accept any parameters, so the ScanBarcodesParams is empty.
The ScanBarcodesResult message has fields to describe all the barcodes
that are detected.
Put the following into the file scan_barcodes/scan_barcodes.proto.
syntax = "proto3";
package com.example;
message ScanBarcodesParams {}
enum BarcodeType {
BARCODE_UNSPECIFIED = 0;
BARCODE_NONE = 1;
BARCODE_EAN_8 = 2;
BARCODE_EAN_13 = 3;
BARCODE_UPC_A = 4;
BARCODE_UPC_E = 5;
BARCODE_UPC_EAN_EXTENSION = 6;
}
message Corner {
double x = 1;
double y = 2;
}
message Barcode {
BarcodeType type = 1;
string data = 2;
repeated Corner corners = 3;
}
message ScanBarcodesResult {
repeated Barcode barcodes = 1;
}
The skill's manifest must be updated to indicate which protobuf messages
are used for return values. Add (or edit) the return_type specified in
scan_barcodes/scan_barcodes.manifest.textproto to be as follows.
return_type {
message_full_name: "com.example.ScanBarcodesResult"
}
Equipment
The skill needs a camera to be able to see barcodes.
This is done through the Equipment interface.
This interface is separate from parameters, though it causes a drop-down box
to appear under the Parameters tab in Flowstate.
First, give a name for the equipment. This is meant to be a human readable name, as it is shown in the user interface. The skill needs to refer to it later, so define it at the top of the file as a global constant.
- Python
- C++
Add the following to scan_barcodes.py.
# Camera slot name; make sure this matches the skill manifest.
CAMERA_EQUIPMENT_SLOT: str = "camera"
Add the following to scan_barcodes.h.
static constexpr char kCameraSlot[] = "camera";
Next, the skill needs to declare the kind of equipment it needs.
This is done by adding equipment types as dependencies in the skill's
manifest.
The CameraConfig equipment type tells Flowstate the skill needs a camera.
Add (or edit) the dependencies section to
scan_barcodes/scan_barcodes.manifest.textproto such that it looks like the
following.
Note that the key matches the value of your global constant.
dependencies {
required_equipment {
key: "camera"
value {
capability_names: "CameraConfig"
}
}
}
Connect to the camera
Flowstate reads the manifest and provides the skill with information about a camera for it to use when the skill is executed.
There is an API to access the provided camera.
- Python
- C++
Add the following import to the top of scan_barcodes.py.
from intrinsic.perception.client.v1.python.camera import cameras
from intrinsic.perception.client.v1.python.image_utils import Metadata
Add the following to scan_barcodes.h.
#include "intrinsic/perception/service/proto/camera_server.pb.h"
#include "intrinsic/perception/service/proto/camera_server.grpc.pb.h"
Add the following to scan_barcodes.cc.
#include "intrinsic/util/status/status_conversion_grpc.h"
#include "intrinsic/perception/proto/camera_config.pb.h"
#include "intrinsic/perception/service/proto/camera_server.grpc.pb.h"
using ::com::example::ScanBarcodesParams;
using ::com::example::ScanBarcodesResult;
using ::com::example::BarcodeType;
using ::intrinsic_proto::perception::CameraConfig;
using ::intrinsic::skills::ExecuteRequest;
using ::intrinsic_proto::skills::PredictResult;
using ::intrinsic::skills::EquipmentPack;
using ::intrinsic::skills::SkillInterface;
using ::intrinsic::skills::ExecuteContext;
using ::intrinsic::connect::WaitForChannelConnected;
- Python
- C++
Add the following code as the first thing in your skill's execute method.
This code accesses the camera provided by Flowstate. Note that the return type
annotation has been changed to -> scan_barcodes_pb2.ScanBarcodesResult, and
the logging.info(...) statement has been deleted.
@overrides(skill_interface.Skill)
def execute(
self,
request: skill_interface.ExecuteRequest[
scan_barcodes_pb2.ScanBarcodesParams
],
context: skill_interface.ExecuteContext,
) -> scan_barcodes_pb2.ScanBarcodesResult:
# Get camera.
camera = cameras.Camera.create(context, CAMERA_EQUIPMENT_SLOT)
Add the following declaration to the ScanBarcodes class in scan_barcodes.h.
absl::Status
ConnectToCamera(
const intrinsic_proto::resources::ResourceGrpcConnectionInfo& grpc_info,
const intrinsic_proto::perception::CameraConfig& camera_config,
std::unique_ptr<intrinsic_proto::perception::CameraServer::Stub>* camera_stub,
std::string* camera_handle);
Add the following method in scan_barcodes.cc.
This code accesses the camera provided by Flowstate.
absl::Status
ScanBarcodes::ConnectToCamera(
const intrinsic_proto::resources::ResourceGrpcConnectionInfo& grpc_info,
const intrinsic_proto::perception::CameraConfig& camera_config,
std::unique_ptr<intrinsic_proto::perception::CameraServer::Stub>* camera_stub,
std::string* camera_handle)
{
// Connect to the provided camera.
std::string camera_grpc_address = grpc_info.address();
std::string camera_server_instance = grpc_info.server_instance();
grpc::ChannelArguments options;
constexpr int kMaxReceiveMessageSize{-1}; // Put no limit on the size of a message we can receive.
options.SetMaxReceiveMessageSize(kMaxReceiveMessageSize);
auto camera_channel = grpc::CreateCustomChannel(camera_grpc_address, grpc::InsecureChannelCredentials(), options);
INTR_RETURN_IF_ERROR(
WaitForChannelConnected(camera_server_instance, camera_channel, absl::InfiniteFuture()));
*camera_stub = intrinsic_proto::perception::CameraServer::NewStub(camera_channel);
auto client_context = std::make_unique<grpc::ClientContext>();
constexpr const auto kCameraClientTimeout = std::chrono::seconds(5);
client_context->set_deadline(std::chrono::system_clock::now() + kCameraClientTimeout);
if (!camera_server_instance.empty()) {
client_context->AddMetadata("x-resource-instance-name", camera_server_instance);
}
intrinsic_proto::perception::CreateCameraRequest create_request;
intrinsic_proto::perception::CreateCameraResponse create_response;
*create_request.mutable_camera_config() = camera_config;
INTR_RETURN_IF_ERROR(intrinsic::ToAbslStatus((*camera_stub)->CreateCamera(client_context.get(), create_request, &create_response)));
*camera_handle = create_response.camera_handle();
return absl::OkStatus();
}
Modify the beginning of the Execute method to match this code in
scan_barcodes.cc. Delete the default logging statement.
absl::StatusOr<std::unique_ptr<google::protobuf::Message>> ScanBarcodes::Execute(
const ExecuteRequest& request, ExecuteContext& context) {
// Get parameters.
INTR_ASSIGN_OR_RETURN(
auto params, request.params<ScanBarcodesParams>());
// Get equipment.
const EquipmentPack equipment_pack = context.equipment();
INTR_ASSIGN_OR_RETURN(const auto camera_equipment, equipment_pack.GetHandle(kCameraSlot));
intrinsic_proto::perception::CameraConfig camera_config;
camera_equipment.resource_data().at("CameraConfig").contents().UnpackTo(&camera_config);
// Connect to the camera over gRPC.
std::unique_ptr<intrinsic_proto::perception::CameraServer::Stub> camera_stub;
std::string camera_handle;
INTR_RETURN_IF_ERROR(ConnectToCamera(
camera_equipment.connection_info().grpc(), camera_config,
&camera_stub, &camera_handle));
Bazel needs to be informed about the new dependency.
Add the following dependency to scan_barcodes/BUILD.
- Python
- C++
py_library(
...
deps = [
":scan_barcodes_py_pb2",
"@ai_intrinsic_sdks//intrinsic/perception/client/v1/python:image_utils",
"@ai_intrinsic_sdks//intrinsic/perception/client/v1/python/camera:cameras",
...
cc_library(
...
deps = [
"@ai_intrinsic_sdks//intrinsic/perception/proto:camera_config_cc_proto",
"@ai_intrinsic_sdks//intrinsic/perception/service/proto:camera_server_cc_grpc",
...
)
Capture an image
The skill now has everything it needs to use the camera.
First, have the skill capture an image each time it is executed.
Add the following to the execute method, below the code that gets the camera.
- Python
- C++
# Capture from the camera and get the first intensity sensor image as a
# numpy array.
capture_result = camera.capture()
intensity_images = (
s.array
for s in capture_result.sensor_images.values()
if s.array.dtype.metadata is not None
and s.array.dtype.metadata.get(Metadata.Keys.PIXEL_TYPE)
== Metadata.Values.PIXEL_INTENSITY
)
img = next(intensity_images, None)
Add the following declaration to the ScanBarcodes class in scan_barcodes.h.
absl::StatusOr<intrinsic_proto::perception::CaptureResult>
CaptureImage(
const intrinsic_proto::resources::ResourceGrpcConnectionInfo& grpc_info,
intrinsic_proto::perception::CameraServer::Stub* camera_stub,
const std::string& camera_handle);
Add the following code to scan_barcodes.cc to define the method.
absl::StatusOr<intrinsic_proto::perception::CaptureResult>
ScanBarcodes::CaptureImage(
const intrinsic_proto::resources::ResourceGrpcConnectionInfo& grpc_info,
intrinsic_proto::perception::CameraServer::Stub* camera_stub,
const std::string& camera_handle)
{
std::string camera_server_instance = grpc_info.server_instance();
auto client_context = std::make_unique<grpc::ClientContext>();
constexpr const auto kCameraClientTimeout = std::chrono::seconds(5);
client_context->set_deadline(std::chrono::system_clock::now() + kCameraClientTimeout);
if (!camera_server_instance.empty()) {
client_context->AddMetadata("x-resource-instance-name", camera_server_instance);
}
intrinsic_proto::perception::CaptureRequest request;
request.set_camera_handle(camera_handle);
request.mutable_timeout()->set_seconds(5);
request.mutable_post_processing()->set_skip_undistortion(false);
intrinsic_proto::perception::CaptureResponse response;
INTR_RETURN_IF_ERROR(intrinsic::ToAbslStatus(camera_stub->Capture(client_context.get(), request, &response)));
return std::move(*response.mutable_capture_result());
}
Add the following to the Execute method in scan_barcodes.cc to call the
method.
// Get a frame from the camera.
INTR_ASSIGN_OR_RETURN(intrinsic_proto::perception::CaptureResult capture_result,
CaptureImage(camera_equipment.connection_info().grpc(), camera_stub.get(), camera_handle));
Process the image
The skill can get a capture result, but it still needs to do something with it. Next, make it use OpenCV to detect barcodes.
- Python
- C++
Add an import for the OpenCV library to scan_barcodes.py.
import cv2
Add an import for numpy to scan_barcodes.py too.
import numpy as np
Add includes for OpenCV to scan_barcodes.h.
#include "opencv2/objdetect/barcode.hpp"
Then declare a detector attribute on the ScanBarcodes class, and specify its type as a class from OpenCV. Make sure that the detector is initialized when an instance of the class is created.
- Python
- C++
Modify scan_barcodes.py to add a detector object on the class.
class ScanBarcodes(skill_interface.Skill):
"""Skill that connects to a camera resource and scans all visible barcodes using OpenCV."""
detector: cv2.barcode.BarcodeDetector
def __init__(self) -> None:
super().__init__()
self.detector = cv2.barcode.BarcodeDetector()
Add a detector_ member field to the ScanBarcodes class in scan_barcodes.h.
cv::barcode::BarcodeDetector detector_;
The skill now has a detector, but it lacks the code to use it. Add the following to the execute method after the code which gets the camera capture result.
For Python, your package can get these dependencies from the Python Package Index. For more information about this process, see this guide on how to add a pip dependency to Bazel.
- Python
- C++
Add the following to the execute method in scan_barcodes.py.
# Run the detector and check results.
(ok,
decoded_data,
decoded_types,
detected_corners,
) = self.detector.detectAndDecodeWithType(img)
Add the following to the Execute method in scan_barcodes.cc.
// Convert to cv::Mat.
auto image_buffer = capture_result.sensor_images().at(0).buffer();
auto img = cv::Mat(
image_buffer.dimensions().rows(),
image_buffer.dimensions().cols(),
CV_8UC3, // Barcode detector requires unsigned data
// Need unsigned data with no const so it can implicitly cast to void*
const_cast<unsigned char*>(reinterpret_cast<const unsigned char *>(image_buffer.data().c_str())));
// Do the detection.
std::vector<cv::Point2f> detected_corners;
std::vector<std::string> decoded_type;
std::vector<std::string> decoded_data;
try {
detector_.detectAndDecodeWithType(img, decoded_data, decoded_type, detected_corners);
} catch (const cv::Exception& e) {
LOG(ERROR) << e.what();
return absl::UnknownError(e.what());
}
The skill now depends on OpenCV and numpy. Bazel needs to be told about these; however, it is more work this time because these dependencies aren't included with the Intrinsic Flowstate SDK.
- Python
- C++
First add a dependency to rules_python and load the pip extension
if not already present.
bazel_dep(name = "rules_python", version = "0.31.0")
python = use_extension("@rules_python//python/extensions:python.bzl", "python")
python.toolchain(
is_default = True,
python_version = "3.11",
)
pip = use_extension("@rules_python//python/extensions:pip.bzl", "pip")
Add the following to the bottom of your MODULE.bazel file.
pip.parse(
hub_name = "scan_barcodes_pip_deps",
python_version = "3.11",
requirements_lock = "//skills/scan_barcodes:requirements.txt",
)
use_repo(pip, "scan_barcodes_pip_deps")
The repository rule references a file called requirements.txt
in the skills/scan_barcodes/ directory.
Create this file adjacent to scan_barcodes.py file and put the following
content into it.
numpy==1.25.0
opencv-contrib-python-headless==4.8.0.76
opencv-python-headless==4.8.0.76
Lastly Bazel needs to be told that the skill uses these dependencies.
Add the dependencies to the skill's py_library() rule.
"@scan_barcodes_pip_deps//numpy:pkg",
"@scan_barcodes_pip_deps//opencv_contrib_python_headless:pkg",
"@scan_barcodes_pip_deps//opencv_python_headless:pkg",
Create three files in the bazel directory:
BUILD- makes thebazeldirectory a Bazel packagenon_module_deps.bzl- defines a Bazel Module extensionopencv.BUILD- tells Bazel how to build OpenCV
Leave BUILD empty.
Put the following content into bazel/non_module_deps.bzl:
"""
Module extension for non-module dependencies
"""
load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
def _non_module_deps_impl(ctx):
http_archive(
name = "opencv",
sha256 = "9dc6a9a95edc133e165e9f6db9412dd899e28d4e5e4979f17cb5966f4b7f3fb1",
strip_prefix = "opencv-4.8.0",
url = "https://github.com/opencv/opencv/archive/4.8.0.zip",
build_file = "//bazel:opencv.BUILD",
)
non_module_deps_ext = module_extension(implementation = _non_module_deps_impl)
Add the following directives to the bottom of your MODULE.bazel file.
example_non_module_deps = use_extension("//bazel:non_module_deps.bzl", "non_module_deps_ext")
use_repo(example_non_module_deps, "opencv")
Put the following content into bazel/opencv.BUILD.
load("@rules_foreign_cc//foreign_cc:defs.bzl", "cmake")
filegroup(
name = "all_srcs",
srcs = glob(["**"]),
visibility = ["//visibility:public"],
)
cmake(
name = "opencv",
generate_args = [
"-GNinja",
"-DBUILD_LIST=core,imgproc,calib3d,objdetect",
"-D BUILD_SHARED_LIBS:BOOL=OFF",
"-D BUILD_PACKAGE:BOOL=OFF",
"-D BUILD_PERF_TESTS:BOOL=OFF",
"-D BUILD_TESTS:BOOL=OFF",
"-D BUILD_JAVA:BOOL=OFF",
"-D OPENCV_FORCE_3RDPARTY_BUILD:BOOL=ON",
"-D WITH_1394:BOOL=OFF",
"-D WITH_IPP:BOOL=OFF",
"-D WITH_JASPER:BOOL=OFF",
"-D WITH_OPENJPEG:BOOL=OFF",
"-D WITH_PNG:BOOL=OFF",
"-D WITH_TBB:BOOL=OFF",
],
lib_source = ":all_srcs",
out_include_dir = "include/opencv4",
out_static_libs = [
"libopencv_core.a",
"libopencv_imgproc.a",
"libopencv_calib3d.a",
"libopencv_objdetect.a",
"opencv4/3rdparty/libittnotify.a",
"opencv4/3rdparty/libquirc.a",
],
visibility = ["//visibility:public"],
)
Add the following to the deps attribute of the cc_library target named scan_barcodes in scan_barcodes/BUILD.
"@opencv//:opencv",
Output the results from the skill
The skill is doing something useful now.
It's using OpenCV to detect barcodes in a camera image.
However, the detections aren't being output yet.
The skill needs to convert the detections into a ScanBarcodesResult.
- Python
- C++
Add the following method to the ScanBarcodes class.
def convert_to_result_proto(
self,
ok: bool,
decoded_data: List[str],
decoded_types: List[int],
detected_corners: np.ndarray,
) -> scan_barcodes_pb2.ScanBarcodesResult:
if not ok:
return scan_barcodes_pb2.ScanBarcodesResult()
barcodes: List[scan_barcodes_pb2.Barcode] = []
for i, barcode_type in enumerate(decoded_types):
if barcode_type == cv2.barcode.NONE:
continue
barcode_data = decoded_data[i]
barcode_corners = detected_corners[i]
corners: List[scan_barcodes_pb2.Corner] = []
for barcode_corner in barcode_corners:
corner = scan_barcodes_pb2.Corner(
x=barcode_corner[0],
y=barcode_corner[1],
)
corners.append(corner)
barcode = scan_barcodes_pb2.Barcode(
type=convert_barcode_type_to_proto(barcode_type),
data=barcode_data,
corners=corners,
)
barcodes.append(barcode)
return scan_barcodes_pb2.ScanBarcodesResult(barcodes=barcodes)
Add the following include directive to scan_barcodes.h.
#include "skills/scan_barcodes/scan_barcodes.pb.h"
Add the following declaration to the ScanBarcodes class in scan_barcodes.h.
absl::StatusOr<std::unique_ptr<::com::example::ScanBarcodesResult>>
ConvertToResultProto(
const std::vector<std::string>& decoded_data,
const std::vector<std::string>& decoded_types,
const std::vector<cv::Point2f>& detected_corners);
Next add the following definition to scan_barcodes.cc.
absl::StatusOr<std::unique_ptr<ScanBarcodesResult>>
ScanBarcodes::ConvertToResultProto(
const std::vector<std::string>& decoded_data,
const std::vector<std::string>& decoded_types,
const std::vector<cv::Point2f>& detected_corners)
{
auto result = std::make_unique<ScanBarcodesResult>();
constexpr int kNumCorners = 4;
if (decoded_data.size() != decoded_types.size() || (kNumCorners * decoded_types.size()) != detected_corners.size()) {
LOG(ERROR) << "Internal error: barcode detection data had inconsistent sizes."
<< " Please report this as a bug with this skill.";
return absl::InternalError("barcode detection data had inconsistent sizes");
}
for (int d = 0; d < decoded_types.size(); ++d){
const std::string& barcode_data = decoded_data.at(d);
const std::string& barcode_type = decoded_types.at(d);
auto corners_iter = detected_corners.begin() + (d * kNumCorners);
::com::example::Barcode* barcode = result->add_barcodes();
barcode->set_type(ConvertBarcodeTypeToProto(barcode_type));
barcode->set_data(barcode_data);
::com::example::Corner* corner = barcode->add_corners();
for (int c = 0; c < kNumCorners; ++c) {
const cv::Point2f& point = *(corners_iter + c);
corner->set_x(point.x);
corner->set_y(point.y);
}
}
return result;
}
That method expects another function to exist in order to convert the barcode type to the proto. Add this function to the file, but this time make it a top level function instead of a class method.
- Python
- C++
Add the following function to scan_barcodes.py.
def convert_barcode_type_to_proto(
barcode_type: int,
) -> scan_barcodes_pb2.BarcodeType:
"""Convert cv2 barcode type to BarcodeType proto."""
if barcode_type == "EAN_8":
return scan_barcodes_pb2.BARCODE_EAN_8
elif barcode_type == "EAN_13":
return scan_barcodes_pb2.BARCODE_EAN_13
elif barcode_type == "UPC_A":
return scan_barcodes_pb2.BARCODE_UPC_A
elif barcode_type == "UPC_E":
return scan_barcodes_pb2.BARCODE_UPC_E
elif barcode_type == "UPC_EAN_EXTENSION":
return scan_barcodes_pb2.BARCODE_UPC_EAN_EXTENSION
else:
return scan_barcodes_pb2.BARCODE_UNSPECIFIED
Add the following near the top of scan_barcodes.cc as a free function.
BarcodeType
ConvertBarcodeTypeToProto(const std::string & type)
{
// Strings from
// https://github.com/opencv/opencv/blob/
// e8f94182f577894410cc59d5d20979dff69d8878/modules/objdetect/src/
// barcode_decoder/abs_decoder.hpp#L46-L51
if (type == "EAN_8") {
return BarcodeType::BARCODE_EAN_8;
} else if (type == "EAN_13") {
return BarcodeType::BARCODE_EAN_13;
} else if (type == "UPC_E") {
return BarcodeType::BARCODE_UPC_E;
} else if (type == "UPC_A") {
return BarcodeType::BARCODE_UPC_A;
} else if (type == "UPC_EAN_EXTENSION") {
return BarcodeType::BARCODE_UPC_EAN_EXTENSION;
}
return BarcodeType::BARCODE_UNSPECIFIED;
}
The very last steps are to call this function and return the converted results. Add this code to the execute method to convert the results.
- Python
- C++
Add the following to the execute method in scan_barcodes.py.
# Convert result and return.
result = self.convert_to_result_proto(
ok, decoded_data, decoded_types, detected_corners
)
logging.info("ScanBarcodesResult: %s", result)
return result
Add the following to the Execute method in scan_barcodes.cc.
std::unique_ptr<ScanBarcodesResult> result;
INTR_ASSIGN_OR_RETURN(result, ConvertToResultProto(decoded_data, decoded_type, detected_corners));
LOG(INFO) << "Detected "<< decoded_data.size() << " barcode(s).";
return result;
Source code
The full source code for this example is available in the intrinsic-ai/sdk-examples repository.