简介:本文详细讲解Python如何通过串口(RS232)实现设备通信,涵盖库选择、代码实现、错误处理及优化建议,适合工业控制、仪器仪表开发者。
RS232(推荐标准232)是工业领域广泛使用的串行通信协议,采用差分信号传输,具有抗干扰能力强、传输距离远(通常15米内)的特点。典型应用场景包括:
Python通过pyserial库可高效实现RS232通信,其优势在于:
Windows用户需确认:
Linux用户需检查:
ls /dev/ttyS* # 查看物理串口ls /dev/ttyUSB* # 查看USB转串口设备sudo usermod -aG dialout $USER # 添加用户到拨号组(无需重启)
推荐使用最新稳定版pyserial:
pip install pyserial --upgrade# 验证安装python -c "import serial; print(serial.__version__)"
import serialdef create_serial_connection(port, baudrate=9600, timeout=1):"""创建串口连接:param port: 端口名(如'COM3'或'/dev/ttyUSB0'):param baudrate: 波特率(常用9600/19200/115200):param timeout: 读超时(秒):return: Serial对象"""try:ser = serial.Serial(port=port,baudrate=baudrate,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE,stopbits=serial.STOPBITS_ONE,timeout=timeout)print(f"成功连接至 {port},参数:{ser.get_settings()}")return serexcept serial.SerialException as e:print(f"连接失败:{str(e)}")return None# 使用示例conn = create_serial_connection('COM3', 115200)
def send_data(ser, data):"""发送字节数据:param ser: Serial对象:param data: bytes或bytearray类型"""if ser and ser.is_open:ser.write(data)print(f"已发送:{data.hex()}")else:print("串口未连接")# 示例:发送Modbus RTU请求modbus_request = bytes.fromhex('01 03 00 00 00 02 C4 0B')send_data(conn, modbus_request)
def receive_data(ser, expected_length=None):"""接收数据(支持长度校验):param ser: Serial对象:param expected_length: 期望接收的字节数:return: 接收到的字节数据"""if not ser or not ser.is_open:return Nonereceived = bytearray()start_time = time.time()while True:if ser.in_waiting > 0:byte = ser.read(1)received.extend(byte)# 简单校验:收到停止位(示例)if len(received) >= expected_length if expected_length else False:breakelse:if time.time() - start_time > ser.timeout:print(f"接收超时,已收{len(received)}字节")breaktime.sleep(0.01) # 避免CPU占用过高print(f"接收到:{received.hex()}")return received
import timedef modbus_rtu_communication(port, slave_id, register_addr, count):"""Modbus RTU读取保持寄存器示例:param port: 串口名称:param slave_id: 从站地址:param register_addr: 寄存器起始地址:param count: 读取寄存器数量"""# 1. 构建请求帧# 请求格式:从站地址(1) + 功能码(1) + 起始地址(2) + 寄存器数(2) + CRC(2)request = bytearray([slave_id,0x03, # 读取保持寄存器(register_addr >> 8) & 0xFF,register_addr & 0xFF,(count >> 8) & 0xFF,count & 0xFF])# 2. 计算CRC校验crc = calculate_crc(request)request.extend(crc)# 3. 通信循环with create_serial_connection(port, 19200) as ser:if ser:# 发送请求send_data(ser, request)# 接收响应(Modbus RTU响应最小长度为5字节)response = receive_data(ser, 5)if response and len(response) >= 5:# 验证CRCreceived_crc = response[-2:]calc_crc = calculate_crc(response[:-2])if received_crc == calc_crc:# 解析数据(跳过从站地址和功能码)byte_count = response[2]register_values = []for i in range(byte_count // 2):start_idx = 3 + i * 2value = (response[start_idx] << 8) | response[start_idx + 1]register_values.append(value)print(f"读取到寄存器值:{register_values}")else:print("CRC校验失败")def calculate_crc(data):"""Modbus CRC16计算"""crc = 0xFFFFfor byte in data:crc ^= bytefor _ in range(8):if crc & 0x0001:crc >>= 1crc ^= 0xA001else:crc >>= 1return bytes([crc & 0xFF, (crc >> 8) & 0xFF])
import threadingimport queueclass SerialCommunicator:def __init__(self, port):self.ser = create_serial_connection(port)self.send_queue = queue.Queue()self.receive_thread = threading.Thread(target=self._receive_loop)self.receive_thread.daemon = Trueself.receive_thread.start()def send_async(self, data):self.send_queue.put(data)def _receive_loop(self):while self.ser and self.ser.is_open:if self.ser.in_waiting > 0:data = self.ser.read(self.ser.in_waiting)# 处理接收到的数据(可添加解析逻辑)print(f"异步接收:{data.hex()}")time.sleep(0.01)def close(self):if self.ser:self.ser.close()
缓冲区管理:
ser = serial.Serial(...)ser.set_buffer_size(rx_size=4096, tx_size=4096) # 增大缓冲区
流控制配置:
ser = serial.Serial(...,xonxoff=True, # 软件流控rtscts=True # 硬件流控(需设备支持))
非阻塞读取:
def non_blocking_read(ser):if ser.in_waiting > 0:return ser.read(ser.in_waiting)return None
端口占用检查:
netstat -ano | findstr "COM3"lsof | grep /dev/ttyUSB0权限问题:
sudo chmod 666 /dev/ttyUSB0 # 临时解决方案
编码转换:
# 接收ASCII数据ascii_data = received_bytes.decode('ascii', errors='ignore')# 接收HEX字符串hex_str = received_bytes.hex().upper()
帧同步策略:
0xAA 0x55)看门狗机制:
class Watchdog:def __init__(self, timeout=5):self.timeout = timeoutself.last_activity = time.time()def touch(self):self.last_activity = time.time()def is_alive(self):return (time.time() - self.last_activity) < self.timeout
日志记录系统:
import logginglogging.basicConfig(filename='serial_com.log',level=logging.DEBUG,format='%(asctime)s - %(levelname)s - %(message)s')
配置管理:
import configparserconfig = configparser.ConfigParser()config.read('serial_config.ini')port = config.get('SERIAL', 'port')baudrate = config.getint('SERIAL', 'baudrate')
Python通过pyserial库实现RS232通信具有高效、灵活的特点。实际开发中需注意:
扩展方向:
完整代码示例已包含基础通信、协议实现和错误处理,开发者可根据实际需求调整参数和逻辑。