# Teleoperating Robot with MagiClaw

MagiClaw provides two types of the gripper, including in-hand type and on-robot type. It allows users to teleoperate their robots when the on-robot type is mounted at the end of the robots, and the in-hand type is handled by human operators. On this page, we will briefly introduce how to read the data streamed from MagiClaw, and demonstrate teleoperating UR7e robot using MagiClaw.

<figure><img src="/files/eESSkqgE4HBAsT9u6TXp" alt=""><figcaption></figcaption></figure>

## How to Read MagiClaw Data

When the MagiClaw is running, the control box keeps sending data via Transmission Control Protocol (TCP) so that external applications can easily subscribe and process the information in real time. Any device on the same network can subscribe to the MagiClaw data stream and decode the messages consistently.

* **Transport Layer:** MagiClaw uses **TCP** sockets to ensure reliable delivery and ordered transmission.
* **Messaging Layer:** Data is sent via **ZeroMQ (ZMQ)**, which provides efficient asynchronous publish/subscribe messaging.
* **Serialization:** All messages are encoded with **Protocol Buffers (Protobuf)**, ensuring compact binary format, cross-language compatibility, and well-defined schemas.

To receive MagiClaw data, an external program needs to:

1. Connect to the TCP port where MagiClaw publishes messages (e.g., `tcp://<magiclaw-ip>:<port>`). The default ports for sending data of `magiclaw_0` and `magiclaw_1` are `6300` and `6400`.
2. Use ZeroMQ’s **SUB** socket to subscribe to the stream.
3. Deserialize the received binary data using the corresponding **Protobuf message definition** (`magiclaw_msg.proto`).

### Protocol Buffers Definition

Here is the definition of the `magiclaw_msg.proto`, and you can also find it in the [repository](https://github.com/asMagiClaw/magiclaw/blob/main/magiclaw/modules/protobuf/magiclaw_msg.proto):&#x20;

```protobuf
syntax = "proto3";

package MagiClaw;

message Motor
{
    float angle = 1;
    float speed = 2;
    float iq = 3;
    int32 temperature = 4;
}

message Claw
{
    float angle = 1;
    Motor motor = 2;
}

message Finger
{
    bytes img = 1;
    repeated float pose = 2;
    repeated float force = 3;
    repeated float node = 4;
}

message Phone
{
    bytes color_img = 1;
    bytes depth_img = 2;
    int32 depth_width = 3;
    int32 depth_height = 4;
    repeated float local_pose = 5;
    repeated float global_pose = 6;
}

message MagiClaw
{
    float timestamp = 1;
    Claw claw = 2;
    Finger finger_0 = 3;
    Finger finger_1 = 4;
    Phone phone = 5;
    repeated float pose = 6;
}
```

Next step is to generate the class you’ll need to read `MagiClaw` message. To do this, you need to run the protocol buffer compiler `protoc` on the `magiclaw_msg.proto`:

1. If you haven’t installed the compiler, [download the package](https://protobuf.dev/downloads) and follow the instructions in the README.
2. Now run the compiler, specifying the source directory (where your application’s source code lives – the current directory is used if you don’t provide a value), the destination directory (where you want the generated code to go; often the same as `$SRC_DIR`), and the path to your `.proto`:&#x20;

```shell
protoc --proto_path=$SRC_DIR --python_out=$DST_DIR $SRC_DIR/magiclaw_msg.proto
```

The `--python_out` option is used since the example below is in Python – similar options are provided for other supported languages.

This generates `magiclaw_msg_pb2.py` in your specified destination directory.

### Python Example

Here is a Python example to read the MagiClaw data using the generated `magiclaw_msg_pb2.py`, and you can also find it in the [repository](https://github.com/asMagiClaw/magiclaw/blob/main/magiclaw/modules/zmq/magiclaw.py):&#x20;

```python
import zmq
import numpy as np
from typing import Tuple
from datetime import datetime
import magiclaw_msg_pb2    # generated from magiclaw_msg.proto

class MagiClawSubscriber:
    def __init__(
        self,
        host: str,
        port: int,
        hwm: int = 1,
        conflate: bool = True,
    ) -> None:
        """
        Subscriber initialization.
        """

        print("{:-^80}".format(" Claw Subscriber Initialization "))
        print(f"Address: tcp://{host}:{port}")

        # Create a ZMQ context
        self.context = zmq.Context()
        # Create a ZMQ subscriber
        self.subscriber = self.context.socket(zmq.SUB)
        # Set high water mark
        self.subscriber.set_hwm(hwm)
        # Set conflate
        self.subscriber.setsockopt(zmq.CONFLATE, conflate)
        # Connect the address
        self.subscriber.connect(f"tcp://{host}:{port}")
        # Subscribe all messages
        self.subscriber.setsockopt_string(zmq.SUBSCRIBE, "")

        print("Claw Subscriber Initialization Done.")
        print("{:-^80}".format(""))

    def subscribeMessage(self):
        """
        Subscribe the message.
        """

        # Receive the message
        msg = self.subscriber.recv()

        # Parse the message
        magiclaw = magiclaw_msg_pb2.MagiClaw()
        magiclaw.ParseFromString(msg)

        # Unpack the message
        claw_angle = magiclaw.claw.angle
        motor_angle = magiclaw.claw.motor.angle
        motor_speed = magiclaw.claw.motor.speed
        motor_iq = magiclaw.claw.motor.iq
        motor_temperature = magiclaw.claw.motor.temperature
        finger_0_img = magiclaw.finger_0.img
        finger_0_pose = np.array(magiclaw.finger_0.pose)
        finger_0_force = np.array(magiclaw.finger_0.force)
        finger_0_node = np.array(magiclaw.finger_0.node).reshape(-1, 3)
        finger_1_img = magiclaw.finger_1.img
        finger_1_pose = np.array(magiclaw.finger_1.pose)
        finger_1_force = np.array(magiclaw.finger_1.force)
        finger_1_node = np.array(magiclaw.finger_1.node).reshape(-1, 3)
        phone_color_img = magiclaw.phone.color_img
        phone_depth_img = magiclaw.phone.depth_img
        phone_local_pose = np.array(magiclaw.phone.local_pose)
        phone_global_pose = np.array(magiclaw.phone.global_pose)
        magiclaw_pose = np.array(magiclaw.pose)

        return (
            claw_angle,
            motor_angle,
            motor_speed,
            motor_iq,
            motor_temperature,
            finger_0_img,
            finger_0_pose,
            finger_0_force,
            finger_0_node,
            finger_1_img,
            finger_1_pose,
            finger_1_force,
            finger_1_node,
            phone_color_img,
            phone_depth_img,
            phone_local_pose,
            phone_global_pose,
            magiclaw_pose,
        )

    def close(self):
        """
        Close ZMQ socket and context to prevent memory leaks.
        """

        if hasattr(self, "subscriber") and self.subscriber:
            self.subscriber.close()
        if hasattr(self, "context") and self.context:
            self.context.term()
```

To use this `MagiClawSubscriber` class, make sure install dependencies:

```bash
pip install pyzmq protobuf
```

Then you can use the following code:

```python
magiclaw_subscriber = MagiClawSubscriber(
    host=<magiclaw_host>,
    port=6300,
)

while True:
    (
        claw_angle,
        motor_angle,
        motor_speed,
        motor_iq,
        motor_temperature,
        finger_0_img,
        finger_0_pose,
        finger_0_force,
        finger_0_node,
        finger_1_img,
        finger_1_pose,
        finger_1_force,
        finger_1_node,
        phone_color_img,
        phone_depth_img,
        phone_local_pose,
        phone_global_pose,
        magiclaw_pose,
    ) = magiclaw_subscriber.subscribeMessage()
    
    print("MagiClaw pose:", magiclaw_pose)    # print data as you want
```

Now you can get multi-modal data from the MagiClaw (ID: 0), including:

* `claw_angle` (`float`): The angle of the claw.
* `motor_angle` (`float`): The angle of the motor.
* `motor_speed` (`float`): The speed of the motor.
* `motor_iq` (`float`): The current of the motor in IQ format.
* `motor_temperature` (`int`): The temperature of the motor in Celsius.
* `finger_0_img_bytes` (`bytes`): The image captured by finger 0.
* `finger_0_pose` (`list`): The pose of finger 0. Default is a zero vector.
* `finger_0_force` (`list`): The force on the bottom surface of finger 0.
* `finger_0_node` (`list`): The node displacement of finger 0.
* `finger_1_img_bytes` (`bytes`): The image captured by finger 1.
* `finger_1_pose` (`list`): The pose of finger 1. Default is a zero vector.
* `finger_1_force` (`list`): The force on the bottom surface of finger 1.
* `finger_1_node` (`list`): The node displacement of finger 1.
* `phone_color_img_bytes` (`bytes`): The color image captured by the phone.
* `phone_depth_img_bytes` (`bytes`): The depth image captured by the phone.
* `phone_depth_width` (`int`): The width of the phone depth image.
* `phone_depth_height` (`int`): The height of the phone depth image.
* `phone_local_pose` (`list`): The local pose of the phone.
* `phone_global_pose` (`list`): The global pose of the phone.
* `magiclaw_pose` (`list`): The pose of MagiClaw.

If you want to read data from the MagiClaw (ID: 1), just change the value of `port` to `6400`.

## Teleoperate UR7e using MagiClaw

MagiClaw can be used as an input device for teleoperation of a UR7e robotic arm.\
By reading the **pose** (position + orientation) from the MagiClaw data stream, we can map it into incremental Cartesian motions and send them to the UR7e in real time.

First, you need to mount the on-robot MagiClaw at the end of UR7e, and connect it to a control box with an XT30 2+2 cable. The in-hand MagiClaw is connected to another control box with an XT30 2+2 cable. Both of the control boxes are connected to a local network via Ethernet, where your PC and UR7e are also connected.

Then you need to modify the bilateral control mode for each control box. You can launch the control box via either SSH or VNC, and change the value of `mode` in `$HOME/magiclaw/configs/bilateral.yaml`  to `leader` for the in-hand MagiClaw and `follower` for the on-robot one.&#x20;

After that, you can copy the following script `teleop.py` to your PC:

```python
import argparse
import time
import rtde_control
import rtde_receive
import numpy as np
import multiprocessing as mp
from multiprocessing import Queue, Event
from scipy.spatial.transform import Slerp, Rotation as R
from magiclaw_subscriber import MagiClawSubscriber


def magiclaw_process(host, data_queue, stop_event, loop_rate=50) -> None:
    magiclaw_subscriber = MagiClawSubscriber(host, 6300)
    _, _, _, _, magiclaw_pose = magiclaw_subscriber.subscribeMessage()
    init_trans = magiclaw_pose[:3]
    init_rot = R.from_quat(magiclaw_pose[3:])
    
    loop_time = 1.0 / loop_rate
    try:
        while not stop_event.is_set():
            start_time = time.time()
            _, _, _, _, magiclaw_pose = magiclaw_subscriber.subscribeMessage()
            trans = magiclaw_pose[:3]
            rot = R.from_quat(magiclaw_pose[3:])
            delta_trans = trans - init_trans
            delta_rot = rot * init_rot.inv()
            data_queue.put((delta_pos.tolist(), delta_rot.as_quat()))

            elapsed_time = time.time() - start_time
            if elapsed_time < loop_time:
                time.sleep(loop_time - elapsed_time)
        magiclaw_subscriber.close()
    except KeyboardInterrupt:
        if not stop_event.is_set():
            stop_event.set()
        magiclaw_subscriber.close()

def ur_control_process(host, data_queue, stop_event, loop_rate=100) -> None:
    rtde_c = rtde_control.RTDEControlInterface(host)
    rtde_r = rtde_receive.RTDEReceiveInterface(host)
    
    initial_pose = np.array(rtde_r.getActualTCPPose())
    initial_trans = initial_pose[:3]
    initial_rot = R.from_rotvec(initial_pose[3:6])

    last_delta_pos = np.zeros(3)
    last_delta_rot = R.from_quat([0, 0, 0, 1])

    loop_time = 1.0 / loop_rate

    try:
        while not stop_event.is_set():
            start_time = time.time()
            while not data_queue.empty():
                delta_pos, delta_rot_quat = data_queue.get()
                last_delta_pos = np.array(delta_pos)
                last_delta_rot = R.from_quat(delta_rot_quat)
            
            new_trans = initial_trans + last_delta_trans
            new_rot = last_delta_rot * initial_rot
            terget_pose = np.concatenate([new_trans, new_rot.as_rotvec()])
            rtde_c.servoL(
                target_pose.tolist(),
                0.2,
                1.0,
                0.02,
                0.1,
                300.0
            )
            
            elapsed_time = time.time() - start_time
            if elapsed_time < loop_time:
                time.sleep(loop_time - elapsed_time)
        rtde_c.servoStop()
        rtde_c.stopScript()
    except KeyboardInterrupt:
        if not stop_event.is_set():
            stop_event.set()
        rtde_c.servoStop()
        rtde_c.stopScript()

def teleoperation(robot_host: str, magiclaw_host: str) -> None:
    mp.set_start_method("spawn", force=True)
    data_queue = Queue()
    stop_event = Event()
    
    ur_control_proc = mp.Process(
        target=ur_control_process, args=(robot_host, data_queue, stop_event)
    )
    ur_control_proc.start()
    time.sleep(1)
    
    magiclaw_proc = mp.Process(
        target=magiclaw_process, args=(magiclaw_host, data_queue, stop_event)
    )
    magiclaw_proc.start()

    try:
        while True:
            time.sleep(0.1)
    except KeyboardInterrupt:
        stop_event.set()
    finally:
        magiclaw_proc.join()
        ur_control_proc.join()

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--robot_host",
        type=str,
        help="IP address of the robot",
    )
    parser.add_argument(
        "--magiclaw_host",
        type=str,
        help="IP address of the MagiClaw",
    )
    args = parser.parse_args()
    
    teleoperation(args.robot_host, args.magiclaw_host)
```

Before running the script, install the dependencies:

```bash
pip install ur_rtde pyzmq protobuf
```

{% hint style="info" %}

### **INFO**

If you cannot install `ur_rtde` successfully by `pip`, please follow the [official installation guide](https://sdurobotics.gitlab.io/ur_rtde/installation/installation.html).
{% endhint %}

To start teleoperation, start the two MagiClaws in `teleop` mode via the [app](/magiclaw-docs/documentation/getting-started/quickstart.md#run-via-app), [CLI](/magiclaw-docs/documentation/getting-started/quickstart.md#run-via-cli) or [dashboard](/magiclaw-docs/documentation/getting-started/quickstart.md#launch-dashboard). Then on your PC's terminal, run:

```bash
python teleop.py --robot_host <robot_host> --magiclaw_host <magiclaw_host>
```

where `<robot_host>` refers to the IP address of the UR7e, and `<magiclaw_host>` refers to the IP address of the MagiClaw control box.

{% hint style="danger" %}

### **DANGER**

After starting the program, please always pay attention to the status of the robot and press the emergency stop button if necessary.
{% endhint %}

The delta value of `magiclaw_pose` received from the in-hand MagiClaw relative to the initial state will be converted into the delta value of the robot's Tool Center Point (TCP) to calculate the target of the robot's TCP, achieving continuous teleoperation.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://magiclaw.gitbook.io/magiclaw-docs/documentation/tutorials/teleoperating-robot-with-magiclaw.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
