Skip to main content

ICON Python client API

This guide describes the ICON Python client API.

Introduction

ICON is a real-time control framework, intended for industrial robot applications.

The client API lets you control ICON-compatible hardware, which may be real or simulated.

You can use the client API to command a compatible robot arm to perform basic motions, such as point-to-point moves in joint space, and linear moves in Cartesian space. Such motions are known as "actions" in the API.

You can also command sequences of motions, as well as state machines of motions, which are executed in hard real-time. This is done by creating several pending actions and chaining them together with transitions, which are known as "reactions" in the API.

Import ICON

Import the ICON client module with:

from icon.python import icon_api

If your code needs to distinguish between different ICON error classes, you may want:

from icon.python import errors as icon_errors

Connect to the server and initialize the client

The ICON client is initialized by connecting to an ICON server at a specified gRPC address using an intrinsic.util.grpc.connection.ConnectionParams object.

The instance name of the robot controller resource is usually provided by an external component, like the skills interface. Its default value is 'robot_controller'.

See Connecting to Services exposed by Assets for detailed information and examples.

Error reporting

Errors are raised as exceptions. If your program needs to handle errors (without crashing), wrap API calls in try blocks.

For example:

  try:
with icon_client.start_session(parts) as session:
# ...
jmove1 = create_action_utils.create_point_to_point_move_action(
action_id=1,
joint_position_part_name="arm",
goal_position=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6],
)
try:
action = session.add_action(jmove1)
except errors.Session.ActionError as e:
print('Action error:', e)
# ...
except grpc.RpcError as e:
print('Session error:', e)

For the sake of brevity in this documentation, the try blocks are omitted.

Operational State

The ICON server is always in one of the following operational states:

  • DISABLED: Indicates that parts are not ready for sessions to start.
  • FAULTED: Indicates that parts are (or the server as a whole is) in an erroneous state.
  • ENABLED: Indicates that parts are ready for a session to begin.

The OperationalState is part of the OperationalStatus proto.

To check the current operation state:

status = icon_client.get_operational_status()
state = status.state()

match state:
case icon_api.OperationalState.FAULTED:
print('ICON is FAULTED with reason ', status.fault_reason)
case icon_api.OperationalState.DISABLED:
print('ICON is DISABLED')
case icon_api.OperationalState.ENABLED:
print('ICON is ENABLED')
case _:
print('UNKNOWN ICON state \'', state, '\' with reason ', status.fault_reason)

To put the server into the ENABLED state from the DISABLED state:

icon_client.enable()

Disabling the server will immediately end any running sessions, even those created by other clients. To put the server into the DISABLED state from the ENABLED state:

icon_client.disable()

To put the server into the DISABLED state from the FAULTED state:

icon_client.clear_faults()

Parts

A part represents a piece of hardware, either real or simulated, that the server can control. From the client's perspective, a part has:

  • A string name identifier, for example 'arm'.
  • A part-specific configuration, represented as a proto.Any.
  • A part-specific status, represented as a proto.Any.
  • Metadata describing the part and its part-specific data types.

List Parts

To list the available parts:

icon_client.list_parts()
# Example output:
=> ['arm', 'gripper']

Getting Part Configuration

Each part has a configuration that is constant for the lifetime of the ICON server. The configurations for all parts are returned together in the response of icon_client.get_config().

Generic Part Configuration

The configuration message has a field called generic_config, which holds all the information that most users need.

generic_config has optional fields for each Feature Interface. Some are empty messages whose presence indicates whether a Part supports the Feature interface or not, while others have concrete information about the part.

For example:

# Find the configuration message for the part called "arm", and read the number
# of joints.
for part_config in icon_client.get_config().part_configs:
if part_config.name == "arm":
# If the part for some reason doesn't support the JointPosition feature
# interface, this field is unset and we should bail out (more gracefully in
# production code!)
assert(part_config.generic_config.HasField("joint_position_config"))
num_joints = part_config.generic_config.joint_position_config.num_joints

Specialized Part Configuration

Warning: Unless your client code is very specialized towards a certain kind of hardware, you likely don't need this.

The configuration message also contains part-specific configuration that does not fit into the generic_config field, stored as a proto.Any.

An application that expects to handle a specific part implementation can attempt to unpack the proto.Any configuration data and throw a run-time error if its message type is not the one it expects.

For introspection, an application may also use the config_message_type and config_descriptor_set members of the message to unpack the proto.Any message, even if it is not otherwise aware of the message type contained within.

This is useful for generic introspection tools, like those that print a human-readable representation of the configuration.

Sessions

A session:

  1. Claims exclusive control over one or more parts. By doing so, it prevents other client applications from moving those parts while your session is controlling them.
  2. Scopes action instance IDs.
  3. Bounds the lifetime of action-related server-side objects, including actions, reactions and Streams.
  4. Manages action-related event handling. In the implementation of the ICON Python client, this is achieved with a "watcher" thread that is managed by the session object.

To start a session:

with icon_client.start_session(parts) as session:
# ... Code to move robot goes here ...

For example, to control the arm:

with icon_client.start_session(['arm']) as session:
# ... Code to move robot goes here ...

Actions

An action describes how the real-time control layer should control one or more parts.

An action instance is a specific instantiation that controls one or more parts.

From the client's perspective, an action instance has:

  • An action instance ID, which is a session-scoped user-assigned integer.
  • A string action Type name, specifying the action type, for example "intrinsic.point_to_point_move".
  • Action-specific fixed parameters, represented as a proto.Any, which are specified when creating the instance.
  • Action-specific streaming inputs, which can be used to stream data to a running instance. The payloads are represented as proto.Any objects.
  • Action-specific real-time signals, which can be used to signal an event in real-time using Reactions.
  • A list of reactions, which may reference action-specific state variables in their Condition expressions.

Action Signatures

An action signature is the metadata describing a type of action available on the server. It includes the specific types of all fixed parameters, streaming inputs, real-time signals and state variables.

To get the signatures of all available actions:

print(icon_client.list_action_signatures())

If you are only interested in the list of available action type names, the ActionType enum is dynamically generated for you on client connection:

print([action_type for action_type in icon_client.ActionType])
# Example output:
=> [<ActionType.CARTESIAN_POSITION: 'xfa.cartesian_position'>,
<ActionType.STREAMING_JOINT_POSITION_MOVE: 'xfa.streaming_joint_position_move'>,
<ActionType.POINT_TO_POINT_MOVE: 'intrinsic.point_to_point_move'>,
<ActionType.STOP: 'intrinsic.stop'>]

See the ICON actions reference for details of using these actions.

Action Instances

To command a part (or multiple parts), an action instance is created and then started. To create an action instance import the create_action_utils module which provides Python functions to build actions.

from icon.python import create_action_utils

Here is an example of creating an action instance that performs a point-to-point move:

jmove1 = create_action_utils.create_point_to_point_move_action(
action_id=1,
joint_position_part_name="arm",
goal_position=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6],
)

It is required to manually define a session wide unique action_id and the used parts. In addition the different Create... function have different parameters.

The return value (jmove1) is an instance of the action.

To start the action, which will move the robot arm:

session.start_action(action0.id)

Action instances are long-running and remain active on their parts until either:

  1. The action instance is preempted by calling icon_client.start_action(...) with any of the same parts, or
  2. The action instance is preempted in response to a reaction triggering, or
  3. The session ends, in which case the parts come to a stop.

Reactions

Reactions are an essential and powerful feature of ICON, enabling real-time and non-real-time responses to events that occur in the real-time system.

A reaction has:

  • A condition that is evaluated every control cycle by the server.
  • A list of responses that describe what should happen if the condition is satisfied.

A reaction is associated with an action instance. The reaction's condition is only evaluated (and responses only trigger) while the associated action instance is active.

A reaction's responses are triggered at most once for an active action instance. If the action instance is restarted, the responses may trigger again.

Conditions

A reaction's condition is an expression that references action-specific state variables.

Here are some examples:

cond0 = icon_api.Condition.is_done()
cond1 = icon_api.Condition.is_less_than('distance_to_goal', 0.25)
cond2 = icon_api.Condition.is_greater_than_or_equal('xfa.action_time_elapsed', 8.0)
cond3 = icon_api.Condition.is_approx_equal('distance_to_goal', 0.25)
cond4 = icon_api.Condition.is_true('fts_tared')
cond5 = icon_api.Condition.any_of([
icon_api.Condition.is_greater_than('action_time_elapsed', 10.0),
icon_api.Condition.is_less_than('distance_to_goal', 0.25)])
cond6 = icon_api.Condition.all_of([
icon_api.Condition.is_done(),
icon_api.Condition.is_greater_than('xfa.action_time_elapsed', 3.00)])

Note: the meaning of is_done() is action-specific, and most feedback-controlled actions (such as impedance holds) are never done.

It is also possible to specify conditions on fields of a part status (e.g. joint position sensors of the arm part or forces of the force torque sensor part). The preceding examples used strings directly for creating conditions with action state variables. However, the ICON API offers builder functions to create the correct string (called state variable path) for a part status condition. A part status condition is a Boolean expression over one or more state variable paths. A state variable path is a reference to a field on the part status (e.g., position of joint 3) or a computed state variable from a vector in the part status (e.g., the force magnitude of a force sensor).

Here are some examples:

# Is fulfilled when the signal at index `signal_index` in the digital input
# block named `adio_block_name` on the ADIO part named `adio_part_name` is true.
adio_cond = icon_api.Condition.is_true(
icon_api.StateVariablePath.ADIO.digital_input(
adio_part_name, adio_block_name, signal_index
)
)
# Is fulfilled when the Cartesian velocity at the tip of the arm
# is greater than 0.5 m/s.
high_cart_velocity_cond = icon_api.Condition.is_greater_than(
icon_api.StateVariablePath.Arm.base_linear_velocity_tip_sensed(
arm_part_name
),
0.5,
)

All Python builder functions are available as members of icon_api.StateVariablePath when importing icon.python.icon_api.

Responses

A reaction's response describes what should happen when the reaction's condition is satisfied. This may be:

  • A real-time response:
    • StartActionInRealTime: Starts another action the next control cycle.
    • TriggerRealtimeSignal: Triggers a real-time signal which can be read within the associated action on the next control cycle.
  • A client-side response:
    • TriggerCallback: Triggers a user-specified client-side callback function.
    • Event: Signals a flag, which can be used to unblock a waiting client.

Add a transition between two actions

When the condition is met, another action is started in the next control cycle. Both actions need to be added to the session and afterwards the transition can be created.

stop = create_action_utils.create_stop_action(
action_id=2,
joint_position_part_name=_ARM_PART.value,
)
session.add_actions([jmove1, stop])

session.add_transition(
jmove1,
stop,
high_cart_velocity_cond,
)
session.start_action(jmove1.id)

Wait for an Action to Finish

To block the client, start an action and wait until it reports is-done.

  session.add_action(jmove1)
session.start_action_and_wait(jmove1)

To block the client until a custom condition is met, create a custom event:

  session.add_action(jmove1)
event = session.add_reaction(action_stop, high_cart_velocity_cond)
session.start_action_and_wait(jmove1, event)

Trigger a User Callback

To trigger a user callback in response to a reaction event:

  def handle_event(timestamp, previous_action_id, current_action_id):
print('Almost there!')

session.add_action(jmove1)
session.add_reaction(
jmove1,
icon_api.Condition.is_less_than('distance_to_goal', 0.25),
callback=handle_event,
)
session.start_action(jmove1.id)

Chain Actions in Real-Time

Real-time responses can be used to chain actions in real-time. This can be used to achieve deterministic timing of a sequence of actions. add_action_sequence adds multiple actions to the session and connects them based on the is_done condition. In addition it returns an event on the last is_done in the action sequence.

  jmove1 = create_action_utils.create_point_to_point_move_action(
action_id=1, joint_position_part_name="arm", goal_position=jpos1
)
jstop = create_action_utils.create_stop_action(
action_id=2, joint_position_part_name="arm"
)
jmove2 = create_action_utils.create_point_to_point_move_action(
action_id=3, joint_position_part_name="arm", goal_position=jpos2
)

# Add the actions and chain them together.
done_event = session.add_action_sequence([jmove1, jstop, jmove2])

# Start jmove1 and wait for the last action to finish.
session.start_action_and_wait(jmove1, done_event)

Streams

Certain actions support streaming input. The available stream field names are action-specific. To open a stream:

  # The field_name is action-specific.
stream = session.open_stream(action_id, field_name)

Then write to the stream with:

  # The message type of payload is action-specific.
stream.write(payload)