ROS中Python话题发布与订阅入门|发布自定义话题|消息
ROS中Python话题发布与订阅入门|发布自定义话题|消息#!/usr/bin/env pythonfrom time import sleep, timefrom math import piimport threadingimport mathfrom matplotlib import pyplot as pltimport numpy as npimport rospyfrom sens
·
ROS中Python话题发布与订阅入门|发布自定义话题|消息
#!/usr/bin/env python
from time import sleep, time
from math import pi
import threading
import math
from matplotlib import pyplot as plt
import numpy as np
import rospy
from sensor_msgs.msg import JointState #从sensor_msgs.msg包中导入一个Joint_state类型
from std_msgs.msg import String #从std_msg包中导入一个string类型
#话题发布
#节点订阅
#Ezekiel
class Ez_msg:
def __init__(self):
self.angle = 0
self.delta_x = 1
def Ezekiel_Node_Pub():
# ROS节点初始化
rospy.init_node('Ezekiel_talk', anonymous=True)
# 创建一个Publisher,发布名为/Ezekiel_mok的topic,消息类型为Ezekiel_talk::Pub_Str,队列长度10
Ezekiel_info_pub = rospy.Publisher('/Ezekiel_mok', Pub_Str, queue_size=10)
#设置循环的频率
rate = rospy.Rate(10)
#实体化类
Ez_msg_ = Ez_msg()
while not rospy.is_shutdown():
# 发布消息
Ezekiel_info_pub.publish(Ez_msg_)
rate.sleep()# 按照循环频率延时
rospy.loginfo("Publish laneline message[%d, %d]", Ez_msg_.angle, Ez_msg_.delta_x)
if __name__ == '__main__':
try:
Ezekiel_Node_Pub()
except rospy.ROSInterruptException:
pass
#!/usr/bin/env python
from time import sleep, time
from math import pi
import threading
import math
from matplotlib import pyplot as plt
import numpy as np
import rospy
from sensor_msgs.msg import JointState #从sensor_msgs.msg包中导入一个Joint_state类型
from std_msgs.msg import String #从std_msg包中导入一个string类型
def EzekielInfoCallback(Ez_msg_):
rospy.loginfo("Publish laneline message[%d, %d]", Ez_msg_.angle, Ez_msg_.delta_x)
def Ezekiel_Subscriber():
# ROS节点初始化
rospy.init_node('Ezekiel_Subscriber', anonymous=True)
# 创建一个Subscriber,订阅名为/laneline_info的topic,注册回调函数lanelineInfoCallback
rospy.Subscriber("/Ezekiel_mok", Pub_Str, Ez_InfoCallback)
# 循环等待回调函数
rospy.spin()
if __name__ == '__main__':
Ezekiel_Subscriber()
更多推荐



所有评论(0)