API¶
If you are new to RoboMasterPy, you may want to read Quick Start firstly.
The API documentation here are in Chinese-English contraposition style.
Receive IP Broadcast¶
Robomaster broadcasts its IP address under router mode.
-
robomasterpy.
get_broadcast_ip
(timeout: float = None) → str¶ 接收广播以获取机甲IP
Receive broadcasting IP of Robomaster.
- Parameters
timeout – 等待超时(秒)。 timeout in second
- Returns
机甲IP地址。IP of Robomaster.
Commander¶
Commander is a SDK(client) for Robomaster TCP text API.
-
class
robomasterpy.
Commander
(ip: str = '', timeout: float = 30)¶ -
__init__
(ip: str = '', timeout: float = 30)¶ 创建SDK实例并连接机甲,实例在创建后立即可用。
Create a new SDK instance and connect it to Robomaster. Instance is available immediately after creation.
- Parameters
ip – 可选,机甲IP,可在路由器模式下自动获取。 (Optional) IP of Robomaster, which can be detected automatically under router mode.
timeout – 可选,TCP通讯超时(秒)。 (Optional) TCP timeout in second.
-
armor_event
(attr: str, switch: bool) → str¶ 控制装甲板检测事件上报。
Enable or disable specified armor event.
- Parameters
attr – 事件属性名称,范围见 ARMOR_ENUMS. armor event name, see ARMOR_ENUMS.
switch – 是否开启上报 on/off
- Returns
ok,否则raise。 ok, or raise certain exception.
-
armor_sensitivity
(value: int) → str¶ 设置装甲板打击检测灵敏度。
Update armor sensitivity.
- Parameters
value – 装甲板灵敏度,数值越大,越容易检测到打击。默认灵敏度值为 5. armor sensitivity, the bigger, the more sensitive. Default to 5.
- Returns
ok,否则raise。 ok, or raise certain exception.
-
audio
(switch: bool) → str¶ 音频流开关控制。
Enable or disable audio stream.
- Parameters
switch – 打开/关闭 on/off
- Returns
ok,否则raise。 ok, or raise certain exception.
-
blaster_fire
() → str¶ 控制水弹枪发射一次。
Fire once.
- Returns
ok,否则raise。 ok, or raise certain exception.
-
chassis_move
(x: float = 0, y: float = 0, z: float = 0, speed_xy: float = None, speed_z: float = None) → str¶ 控制底盘运动当指定位置,坐标轴原点为当前位置。
Make chassis move to specified location. The origin is current location.
- Parameters
x – x 轴向运动距离,单位 m movement in x axis, in meter
y – y 轴向运动距离,单位 m movement in y axis, in meter
z – z 轴向旋转角度,单位 ° movement in z axis, in degree
speed_xy – xy 轴向运动速度,单位 m/s speed in both x and y axis, in meter/second
speed_z – z 轴向旋转速度, 单位 °/s speed in z axis, in degree/second
- Return ok
ok,否则raise。 ok, or raise certain exception.
-
chassis_push_off
(position: bool = False, attitude: bool = False, status: bool = False, all: bool = False) → str¶ 关闭底盘中相应属性的信息推送。
Disable chassis push of specified attribution.
- Parameters
position – 是否关闭位置推送。 whether disable position push.
attitude – 是否关闭姿态推送。 whether disable attitude push.
status – 是否关闭状态推送。 whether disable status push.
all – 关闭所有推送。 whether disable all pushes.
- Returns
ok,否则raise。 ok, or raise certain exception.
-
chassis_push_on
(position_freq: int = None, attitude_freq: int = None, status_freq: int = None, all_freq: int = None) → str¶ 打开底盘中相应属性的信息推送,支持的频率 1, 5, 10, 20, 30, 50.
Enable chassis push of specified attribution. Supported frequencies are 1, 5, 10, 20, 30, 50.
- Parameters
position_freq – 位置推送频率,不设定则设为None. position push frequency, None for no-op.
attitude_freq – 姿态推送频率,不设定则设为None. attitude push frequency, None for no-op.
status_freq – 状态推送频率,不设定则设为None. status push frequency, None for no-op.
all_freq – 统一设置所有推送频率,设置则开启所有推送。 update all push frequency, this affects all attribution.
- Returns
ok,否则raise。 ok, or raise certain exception.
-
chassis_speed
(x: float = 0, y: float = 0, z: float = 0) → str¶ 更改底盘运动速度。
Update chassis speed.
- Parameters
x – x 轴向运动速度,单位 m/s speed in x axis, in m/s
y – y 轴向运动速度,单位 m/s speed in y axis, in m/s
z – z 轴向旋转速度,单位 °/s rotation speed in z axis, in °/s
- Returns
ok,否则raise。 ok, or raise certain exception.
-
chassis_wheel
(w1: int = 0, w2: int = 0, w3: int = 0, w4: int = 0) → str¶ 更改底盘轮子速度。
Update chassis wheel rotation speed.
- Parameters
w1 – 右前麦轮速度,单位 rpm w1(right front) wheel speed(rpm)
w2 – 左前麦轮速度,单位 rpm w2(left front) wheel speed(rpm)
w3 – 右后麦轮速度,单位 rpm w3(right back) wheel speed(rpm)
w4 – 左后麦轮速度,单位 rpm w4(left back) wheel speed(rpm)
- Return ok
ok,否则raise。 ok, or raise certain exception.
-
close
()¶ 关闭实例,回收socket资源。注意这个命令并不会发送quit到机甲,避免打扰其他在线的Commander.
Close instance, deallocate system socket resource. Note this method will NOT send quit command to Robomaster, because there may be other Commander still active.
-
do
(*args) → str¶ 执行任意命令。
Execute any command.
- Parameters
args – 命令内容。 command content.
- Returns
命令返回。 the response of the command.
-
get_armor_sensitivity
() → int¶ 获取装甲板打击检测灵敏度。
Query for armor sensitivity.
- Returns
装甲板灵敏度 armor sensitivity.
-
get_chassis_attitude
() → robomasterpy.client.ChassisAttitude¶ 获取底盘姿态信息。
Query for chassis attitude.
- Returns
pitch 轴角度(°),roll 轴角度(°),yaw 轴角度(°)。 pitch, roll, yaw in degree.
-
get_chassis_position
() → robomasterpy.client.ChassisPosition¶ 获取底盘当前的位置(以上电时刻位置为原点)。
Query for chassis location. The origin is where Robomaster powers on.
- Returns
x 轴位置(m),y 轴位置(m),偏航角度(°)。 location consisting of x, y, z, in meter, meter, degree.
-
get_chassis_speed
() → robomasterpy.client.ChassisSpeed¶ 获取底盘速度。
Query for chassis speed.
- Returns
x 轴向运动速度(m/s),y 轴向运动速度(m/s),z 轴向旋转速度(°/s),w1 右前麦轮速度(rpm),w2 左前麦轮速速(rpm),w3 右后麦轮速度(rpm),w4 左后麦轮速度(rpm)。 speed in x axis(m/s), speed in y axis(m/s), rotation speed in z axis(°/s), w1(right front) wheel speed(rpm), w2(left front) wheel speed(rpm), w3(right back) wheel speed(rpm), w4(left back) wheel speed(rpm)
-
get_chassis_status
() → robomasterpy.client.ChassisStatus¶ 获取底盘状态信息。
Query for chassis status.
- Returns
底盘状态,详见 ChassisStatus chassis status, see class ChassisStatus.
-
get_gimbal_attitude
() → robomasterpy.client.GimbalAttitude¶ 获取云台姿态信息。
Query for gimbal attitude.
- Returns
pitch 轴角度(°),yaw 轴角度(°) pitch, yaw in degree
-
get_ip
() → str¶ 返回机甲IP。
get IP that the commander currently connects to.
- Returns
机甲IP。 IP that the commander currently connects to
-
get_ir_sensor_distance
(id: int) → float¶ 获取指定 ID 的红外深度传感器距离。
Query for distance reported by specified IR sensor.
- Parameters
id – 红外传感器的 ID ID of IR sensor
- Returns
指定 ID 的红外传感器测得的距离值,单位 mm distance in mm
-
get_robot_mode
() → str¶ 查询当前机甲的运动模式。
Query for Robomaster’s current movement mode.
- Returns
三种模式之一,见enum MODE_*。 Movement mode, refer to enum MODE_*
-
gimbal_move
(pitch: float = 0, yaw: float = 0, pitch_speed: float = None, yaw_speed: float = None) → str¶ 控制云台运动到指定位置,坐标轴原点为当前位置。
Make gimbal move to specified location. The origin is current location.
- Parameters
pitch – pitch 轴角度, 单位 ° pitch delta in degree
yaw – yaw 轴角度, 单位 ° yaw delta in degree
pitch_speed – pitch 轴运动速速,单位 °/s pitch speed in °/s
yaw_speed – yaw 轴运动速速,单位 °/s yaw speed in °/s
- Returns
ok,否则raise。 ok, or raise certain exception.
-
gimbal_moveto
(pitch: float = 0, yaw: float = 0, pitch_speed: float = None, yaw_speed: float = None) → str¶ 控制云台运动到指定位置,坐标轴原点为上电位置。
Make gimbal move to specified location. The origin is gimbal center.
- Parameters
pitch – pitch 轴角度, 单位 ° pitch delta in degree
yaw – yaw 轴角度, 单位 ° yaw delta in degree
pitch_speed – pitch 轴运动速速,单位 °/s pitch speed in °/s
yaw_speed – yaw 轴运动速速,单位 °/s yaw speed in °/s
- Returns
ok,否则raise。 ok, or raise certain exception.
-
gimbal_push_off
(attitude: bool = True) → str¶ 关闭云台中相应属性的信息推送。
Disable gimbal push of specified attribution.
- Parameters
attitude – 关闭姿态推送。 whether disable attitude push.
- Returns
ok,否则raise。 ok, or raise certain exception.
-
gimbal_push_on
(attitude_freq: int = 5) → str¶ 打开云台中相应属性的信息推送,支持的频率 1, 5, 10, 20, 30, 50.
Enable gimbal attribution push. Supported frequencies are 1, 5, 10, 20, 30, 50.
- Parameters
attitude_freq – 姿态推送频率. attitude push frequency.
- Returns
ok,否则raise。 ok, or raise certain exception.
-
gimbal_recenter
()¶ 控制云台回中。
Recenter the gimbal.
- Returns
ok,否则raise。 ok, or raise certain exception.
-
gimbal_resume
()¶ 控制云台从休眠状态中恢复
awake the gimbal if it is suspended(sleeping).
- Returns
ok,否则raise。 ok, or raise certain exception.
-
gimbal_speed
(pitch: float, yaw: float) → str¶ 控制云台运动速度。
Update gimbal speed.
- Parameters
pitch – pitch 轴速度,单位 °/s Pitch speed in °/s
yaw – yaw 轴速度,单位 °/s yaw speed in °/s
- Returns
ok,否则raise。 ok, or raise certain exception.
-
gimbal_suspend
()¶ 使云台进入休眠状态。
Suspend(sleep) the gimbal.
- Returns
ok,否则raise。 ok, or raise certain exception.
-
ir_sensor_measure
(switch: bool = True)¶ 打开/关闭所有红外传感器开关。
Enable or disable all IR sensor.
- Parameters
switch – 打开/关闭 on/off
- Returns
ok,否则raise。 ok, or raise certain exception.
-
led_control
(comp: str, effect: str, r: int, g: int, b: int) → str¶ 控制 LED 灯效。跑马灯效果仅可作用于云台两侧 LED。
Update LED effects. Note scrolling effect works only on gimbal LEDs.
- Parameters
comp – LED 编号,见 LED_ENUMS LED composition, see LED_ENUMS
effect – 灯效类型,见 LED_EFFECT_ENUMS effect type, see LED_EFFECT_ENUMS
r – RGB 红色分量值 RGB red value
g – RGB 绿色分量值 RGB green value
b – RGB 蓝色分量值 RGB blue value
- Returns
ok,否则raise。 ok, or raise certain exception.
-
robot_mode
(mode: str) → str¶ 更改机甲的运动模式。
Update Robomaster’s movement mode.
- Parameters
mode – 三种模式之一,见enum MODE_*。 Movement mode, refer to enum MODE_*
- Returns
ok,否则raise。 ok, or raise certain exception.
-
sound_event
(attr: str, switch: bool) → str¶ 控制声音识别事件上报。
Enable or disable specified sound event.
- Parameters
attr – 事件属性名称,范围见 SOUND_ENUMS. sound event name, see ARMOR_ENUMS.
switch – 是否开启上报 on/off
- Returns
ok,否则raise。 ok, or raise certain exception.
-
stream
(switch: bool) → str¶ 视频流开关控制。
Enable or disable video stream.
- Parameters
switch – 打开/关闭 on/off
- Returns
ok,否则raise。 ok, or raise certain exception.
-
version
() → str¶ 查询当前机甲的SDK版本。
query robomaster SDK version
- Returns
SDK版本号。 SDK version string.
-
Framework¶
For a brief introduction on framework, read Quick Start.
Hub¶
Hub is where workers live, its API is declarative rather than imperative.
-
class
robomasterpy.framework.
Hub
¶ 程序中枢。
使用
self.worker()
注册你的Worker;使用
self.run()
开始运行;使用
Ctrl + C
停止程序。
Hub is the orchestrator.
Use
self.worker()
to register your worker;Use
self.run()
to run;Use
Ctrl + C
for exiting.
-
TERMINATION_TIMEOUT
= 10¶
-
__init__
()¶ 创建Hub不需要提供参数。
Hub does not need parameters to initialize.
-
close
()¶ 让Hub以及Hub名下的所有Worker停止工作。这个方法不需要用户来调用。
Stop hub and workers registered under hub. This method does not need to be called by user.
-
run
()¶ 按注册顺序启动所有worker,阻塞主进程。 Hub在接收到
SIGTERM
或者SIGINT
时会尝试安全退出。Start workers and block the main process. Hub tries to shutdown itself gracefully when receiving
SIGTERM
orSIGINT
.
-
worker
(worker_class, name: str, args: Tuple = (), kwargs=None)¶ 将worker注册到hub. 所有的worker都在独立的进程中运行。
Register worker to hub. All workers run in their own operating system process.
- Parameters
worker_class – worker的类,注意不是worker实例。 class of worker to be registered, note provide the class, instead of an instance.
name – worker的名字,选个好名字可以让调试更容易。 worker’s name. A good name makes debugging less painful.
args – 创建worker需要使用的参数。 args to initialize the worker.
kwargs – 创建worker需要使用的kwargs参数。 kwargs to initialize the worker.
Worker¶
-
class
robomasterpy.framework.
Worker
(name: str, out: Optional[multiprocessing.context.BaseContext.Queue], protocol: Optional[str], address: Tuple[str, int], timeout: Optional[float], loop: bool = True)¶ 用户逻辑的载体,继承这个类然后将你的逻辑写到
work()
方法中即可。 你的逻辑可以是有状态的或无状态的,如果需要,你可以在继承的新类中使用自定义的任意属性保存你的状态。 如果你需要打印日志,使用Worker.logger
属性。 创建你的新类的实例后使用Hub.worker()
注册到Hub实例供其调用。一个Worker子类一般只做一件事情,多个Worker子类各司其职,相互协作,通过
multiprocessing.Queue
进行单向通讯, 最终,负责传感器的Worker的数据会汇聚到负责控制的Worker中,负责控制的Worker再使用Commander
向机甲下令。RoboMasterPy.framework中提供了多个定制化,开箱即用的Worker以满足 接收视频流(Vision)、接收事件(EventListener)和推送(PushListener)、汇聚信息控制机甲(Mind)等常见需求, 请参阅API文档中的“预置Worker”部分。
Worker takes user’s business logic. Inherit this class and write logic code in
work()
method. A worker can be stateful or stateless, at your choice. You may use some user-defined attributes to store your state if the need arises. UseWorker.logger
attribute if some logs need to be printed. Register your well-defined new worker to hub usingHub.worker()
so that hub schedule and calls your logic.One Worker subclass nearly always does only one business. Multiple Worker subclasses do their own jobs and cooperate, communicate through
multiprocessing.Queue
in one-way fashion. Data from those subclasses in charge of sensor flows into subclass in charge of controlling, who command your Robomaster byCommander
.RoboMasterPy.framework provides many customized and out-of-box Worker to cover common usage like receiving video stream(vision), receiving events(EventListener) & pushes(PushListener), gathering info and controlling the Robomaster(Mind), etc. Please refer to “Sugared Workers” section in API doc.
-
QUEUE_TIMEOUT
: float = 0.05¶
-
__init__
(name: str, out: Optional[multiprocessing.context.BaseContext.Queue], protocol: Optional[str], address: Tuple[str, int], timeout: Optional[float], loop: bool = True)¶ 初始化Worker,这个方法一般在子类的__init__()方法中调用,不会直接使用。
Initialize Worker, called in its subclasses’s __init__(), seldom used directly.
- Parameters
name – worker实例的名称,这个名称也会作为进程的名称,你应该使用一个有利于调试的,描述性好的名字。 name of Worker instance, which is also the name of the process. Choose wisely to benefit when debugging.
out – (可选)用于输出产物的multiprocessing.Queue,定义后Worker._outlet()方法可用。 (Optional) a multiprocessing.Queue to put worker’s product. Worker._outlet() is available after this parameter is defined.
protocol – 连接机甲的协议名称,从tcp, udp和None中选择,在tcp和udp选项下Worker._intake()方法可用。 Protocol to use to connect your Robomaster, choose one in tcp, udp, None. Worker._intake() is available under tcp or udp.
address – 机甲的IP地址和端口号,可从Commander.get_ip()中获得机甲IP,端口号和业务有关,见framework.*_PORT. IP and port to Robomaster. IP is available using Commander.get_ip(), and refer to framework.*_PORT for port number.
timeout – tcp或udp的连接超时。 timeout of tcp or udp.
loop – 是否在Worker的生命周期中循环调用work()方法,常见True,在方法提供自己的生命周期管理的时候可选False. Whether call work() method in a loop for Worker’s lifetime, usually True. Set to False when work() has its own lifecycle management.
-
close
()¶ 让Worker停止工作,本方法一般由Hub调用。
Let worker stop. Nearly always called by Hub.
-
property
closed
¶ 当前Worker是否已经永久停止。
Whether the worker has stopped working eventually.
- Returns
True for stopped.
-
get_address
() → Tuple[str, int]¶ 获取Worker连接的IP和port.
get IP and port which the worker currently connects to.
- Returns
(IP, port)
-
property
logger
¶ 使用本属性打印日志。
Use this attribute for logging.
-
property
name
¶ 当前Worker的名字。
The worker’s name.
-
work
() → None¶ 在本方法中实现你的业务逻辑,你可能需要在这里使用下列方法和属性:
使用
self._intake()
方法从tcp或udp中获取数据;使用
self._outlet()
方法将产物放到out中,注意,如果out被没有即时消费的产物填满,self._outlet()会丢弃最新的产物;使用
self.logger
属性打印日志。
Implement your business logic in this method. These methods and attributes may be useful:
use
self._intake()
method to intake data from tcp or udp connection;use
self._outlet()
to put product toout
. Keep in mind ifout
is filled up with unconsumed product, self._outlet() discards the latest products.use
self.logger
for log printing.
-
Sugared Workers¶
RoboMasterPy comes with some sugared worker to satisfy common needs, their names are self-explanatory.
You can always inherit and implement your own worker if sugared ones do not cover your need.
-
class
robomasterpy.framework.
Vision
(name: str, out: Optional[multiprocessing.context.BaseContext.Queue], ip: str, processing: Callable[[…], None], none_is_valid=False)¶ 拉取并解析机甲的视频流,回调函数会收到解析好的OpenCV视频帧,回调函数的返回值会被放置到
out
中。Pull and parse Robomaster’s video stream, call the callback with parsed OpenCV frame, and put return value from callback into
out
.-
__init__
(name: str, out: Optional[multiprocessing.context.BaseContext.Queue], ip: str, processing: Callable[[…], None], none_is_valid=False)¶ 初始化自身。
Initialize self.
- Parameters
name – worker名称 name of worker
out – PushListener会将产物放入其中以供下游消费。 PushListener puts product into
out
for downstream consuming.ip – 机甲的IP,可从Commander.get_ip()取得。 IP of your Robomaster, can be obtained from Commander.get_ip()
processing – 回调函数,每当有新的视频帧到来时,函数都会被Vision调用,形如
processing(frame=frame, logger=self.logger)
, 其中frame为cv2(OpenCV) frame,logger可用于日志打印。 callback function, is called every time when a new frame comes, in formprocessing(frame=frame, logger=self.logger)
, where frame is cv2(OpenCV) frame, and logger is for logging.none_is_valid – 是否在回调函数返回None时将None放入
out
,默认为False. Whether to put None returned from callback function intoout
, default to False.
-
close
()¶ 让Worker停止工作,本方法一般由Hub调用。
Let worker stop. Nearly always called by Hub.
-
work
() → None¶ 在本方法中实现你的业务逻辑,你可能需要在这里使用下列方法和属性:
使用
self._intake()
方法从tcp或udp中获取数据;使用
self._outlet()
方法将产物放到out中,注意,如果out被没有即时消费的产物填满,self._outlet()会丢弃最新的产物;使用
self.logger
属性打印日志。
Implement your business logic in this method. These methods and attributes may be useful:
use
self._intake()
method to intake data from tcp or udp connection;use
self._outlet()
to put product toout
. Keep in mind ifout
is filled up with unconsumed product, self._outlet() discards the latest products.use
self.logger
for log printing.
-
-
class
robomasterpy.framework.
PushListener
(name: str, out: multiprocessing.context.BaseContext.Queue)¶ 监听并解析机甲大师的推送,输出强类型的推送内容。
Listen and parse pushes from Robomaster, product parsed pushes in strong typed manner.
-
__init__
(name: str, out: multiprocessing.context.BaseContext.Queue)¶ 初始化自身。
Initialize self.
- Parameters
name – worker名称 name of worker
out – PushListener会将产物放入其中以供下游消费。 PushListener puts product into
out
for downstream consuming.
-
work
() → None¶ 在本方法中实现你的业务逻辑,你可能需要在这里使用下列方法和属性:
使用
self._intake()
方法从tcp或udp中获取数据;使用
self._outlet()
方法将产物放到out中,注意,如果out被没有即时消费的产物填满,self._outlet()会丢弃最新的产物;使用
self.logger
属性打印日志。
Implement your business logic in this method. These methods and attributes may be useful:
use
self._intake()
method to intake data from tcp or udp connection;use
self._outlet()
to put product toout
. Keep in mind ifout
is filled up with unconsumed product, self._outlet() discards the latest products.use
self.logger
for log printing.
-
-
class
robomasterpy.framework.
EventListener
(name: str, out: multiprocessing.context.BaseContext.Queue, ip: str)¶ 监听并解析机甲大师的事件,输出强类型的推送内容。
Listen and parse events from Robomaster, product parsed events in strong typed manner.
-
__init__
(name: str, out: multiprocessing.context.BaseContext.Queue, ip: str)¶ 初始化自身。
Initialize self.
- Parameters
name – worker名称 name of worker
out – PushListener会将产物放入其中以供下游消费。 PushListener puts product into
out
for downstream consuming.ip – 机甲的IP,可从Commander.get_ip()取得。 IP of your Robomaster, can be obtained from Commander.get_ip()
-
work
() → None¶ 在本方法中实现你的业务逻辑,你可能需要在这里使用下列方法和属性:
使用
self._intake()
方法从tcp或udp中获取数据;使用
self._outlet()
方法将产物放到out中,注意,如果out被没有即时消费的产物填满,self._outlet()会丢弃最新的产物;使用
self.logger
属性打印日志。
Implement your business logic in this method. These methods and attributes may be useful:
use
self._intake()
method to intake data from tcp or udp connection;use
self._outlet()
to put product toout
. Keep in mind ifout
is filled up with unconsumed product, self._outlet() discards the latest products.use
self.logger
for log printing.
-
-
class
robomasterpy.framework.
Mind
(name: str, queues: Tuple[multiprocessing.context.BaseContext.Queue, …], ip: str, processing: Callable[[…], None], timeout: float = 30, loop: bool = True)¶ 无状态的控制者,适用于简单的控制。 对于复杂的,有状态的控制需求,你需要自己继承Worker来实现。
Stateless controller, suits simple and naive controlling scenario. For complicated, stateful controlling, inherit Worker to implement.
-
__init__
(name: str, queues: Tuple[multiprocessing.context.BaseContext.Queue, …], ip: str, processing: Callable[[…], None], timeout: float = 30, loop: bool = True)¶ 初始化自身。
Initialize self.
- Parameters
name – worker名称 name of worker
queues – 队列元组,其中队列的内容由上游提供。 Tuple of mp.Queue, where their contents are provided by upstream.
ip – 机甲的IP,可从Commander.get_ip()取得。 IP of your Robomaster, can be obtained from Commander.get_ip()
processing – 回调函数,调用时参数形如
processing(cmd=self._cmd, queues=self._queues, logger=self.logger)
, 其中cmd为连接到机甲的Commander,queues为输入的队列元组,logger用于日志打印。 callback function, is called in formprocessing(cmd=self._cmd, queues=self._queues, logger=self.logger)
, where cmd is a connected Commander, queue is the input tuple of mp.Queue, logger is for logging.timeout – Commander的连接超时。 timeout for Commander.
loop – 是否循环调用回调函数processing whether calls processing(callback) function in loop, default to True.
-
close
()¶ 让Worker停止工作,本方法一般由Hub调用。
Let worker stop. Nearly always called by Hub.
-
work
() → None¶ 在本方法中实现你的业务逻辑,你可能需要在这里使用下列方法和属性:
使用
self._intake()
方法从tcp或udp中获取数据;使用
self._outlet()
方法将产物放到out中,注意,如果out被没有即时消费的产物填满,self._outlet()会丢弃最新的产物;使用
self.logger
属性打印日志。
Implement your business logic in this method. These methods and attributes may be useful:
use
self._intake()
method to intake data from tcp or udp connection;use
self._outlet()
to put product toout
. Keep in mind ifout
is filled up with unconsumed product, self._outlet() discards the latest products.use
self.logger
for log printing.
-
Helpers¶
Helpers are some good-to-have features that may be useful for your task.
Distance Measure¶
Some helpers for distance measure and analysis on video stream.
-
robomasterpy.measure.
pinhole_distance
(actual_size: float, pixel_size: float, focal_length: float = 710) → float¶ 使用针孔相机模型估测物体到相机的距离。
Estimate distance between camera and object using pinhole camera model.
- Parameters
actual_size – 实际大小,单位米。 Actual object size in meters.
pixel_size – 像素大小,单位像素。 Object size in pixels.
focal_length – (可选)当前分辨率下的等效焦距,和分辨率相关。默认使用1280*720分辨率下的数值。Perceived focal length at specified image resolution, default to value under 1280*720.
- Returns
物体到相机的距离,单位米。 Distance between camera and object, in meters.
-
robomasterpy.measure.
distance_decomposition
(pixel_x: float, distance: float, horizontal_pixels: float = 1280, horizontal_degrees: float = 96) → Tuple[float, float, float]¶ 将距离分解为前进分量(前为正)和侧向分量(右为正)。本函数要求线段的两端在相同海拔高度。
Decomposition distance into forward vector(forward as positive) and lateral vector(right as positive). This function requires that both ends of the line segment are at the same altitude.
- Parameters
pixel_x – 物体在图像上的x坐标,单位像素。 the x coordinate of the object on the image, in pixels.
distance – 距离,单位米。 Distance in meter.
horizontal_pixels – 图像横向的像素数目,默认1280. The number of pixels in the horizontal direction of the image, the default is 1280.
horizontal_degrees – 图像横向的视角大小,默认96. The horizontal viewing angle of the image, the default is 96.
- Returns
前进分量和侧向分量,单位米;水平偏转角度,单位度。 forward vector and lateral vector in meters; horizontal angle in degrees.