Автоматические выключатели серии ВА69 стали доступны с электронным расцепителем для взаимодействия с другими устройствами по протоколу Modbus. Обычно это рядовая процедура, штатно интегрируемая в систему диспетчеризации или автоматизация конечного заказчика.
Мы же, в целях демонстрации концепции и на примере данного оборудования, пройдёмся с начала до конца.
Физическое подключение
Коммуникация осуществляется через обычную витую пару со всеми стандартными рекомендациями - заземление, установка резистора, экранирование, обеспечение ЭМС, длина провода и так далее. Порт на устройстве с Modbus будет иметь два выхода - А(+) и В(-). Их требуется подключить к соответствующим портам на компьютере. А(+) подключается к А(+) и В(-) подключается к В(-). Если устройств несколько, то они все подключаются последовательно.
В данном примере мы будем подключаться напрямую к компьютеру с операционной системой OS X или Linux. Для физического подключения потребуется USB переходник. Он легко покупается и называется типа RS485 USB, MAX485 или CH301.
Коротко про Modbus
Если очень сильно упростить, то наличие Modbus на устройстве подразумевает наличие процессора в стиле компьютерного. Общеизвестно, что у каждого процессора есть память. В ней закаченная программа хранит все нужные значения. Через протокол Modbus мы, очень упрощено, можем считывать значения в нужных ячейках памяти. Иногда мы можем эти значения менять.
Учитывая восможность одновременной работы в цепи более, чем с одним устройством, во избежании конфликтов, у каждого устройства выставляется свой адрес.
Для каждого устройства предоставляется таблица с адресами нужных параметров. Например:
| Адрес | Длина | Комментарий |
| 0x000B | U16 или 2 байта | Напряжение на фазе А в вольтах х10 |
| 0x000C | U16 или 2 байта | Напряжение на фазе В в вольтах х10 |
| 0x000D | U16 или 2 байта | Напряжение на фазе С в вольтах х10 |
Формат протокола Modbus
Протокол сначала плохо воспринимается визуально, но он достаточно простой. Рассмотрим на примере:
0x01 0x03 0x00 0x0B 0x00 0x06 0xB4 0x0A
0x01 - Первый байт означает адрес устройства. В нашем случае автомат ВА69 работает под номером 1, но это легко меняется с помощью меню на лицевой стороне автомата.
0х03 - Второй байт означает номер команды для выполнения из списка. 3 - читать сразу же несколько регистров. Команда же 6 позволяет записывать значения. Например, менять удаленно код уставки.
0x00 0x0B - Два байта, указывающие нужный адрес. Простоты ради оно же пишется как 0x000B. См таблицу - там находится значение напряжения по фазе А.
0x00 0x06 - Два байта, указывающие сколько байтов вернуть. В нашем случае, см таблицу, мы возвращаем 6 (три раза по 2 байта), чтобы одновременно прочитать напряжения по фазам А, В и С.
0xB4 0x0A - Это высчитываемое значение подписи CRC. Оно используется для того, чтобы принимающее устройство прочитало команду и могло удостоверить, что при передаче не перепутан ни один бит.
Итого. Команда читается следующим образом: "У устройства под номером 1 считай 6 байтов начиная с адреса 0x000B"
Получение CRC
Мы немного отвлекаемся в сторону. Достаточно сказать, что реализацию CRC легко найти на нужном языке легко найти в интернете или использовать ИИ для написания. Приведём пример на Python:
def generate_crc16_table(cls) -> list[int]:
"""Generate a crc16 lookup table.
.. note:: This will only be generated once
"""
result = []
for byte in range(256):
crc = 0x0000
for _ in range(8):
if (byte ^ crc) & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
byte >>= 1
result.append(crc)
return result
Ответ от устройства
Автоматический выключатель ответит через короткое время в аналогичном формате. Его надо будет разобрать. Пример:
0x01 0x03 0x0C 0x04 0x78 0x04 0x78 0x00 0x00 0x07 0xBD 0x07 0xBD 0x00 0x00 0x5D 0x35
0х01 - Означает, что отвечает устройство под номером 1
0х03 - Это устройство отвечает на команду под номером 3
0х0С - Длина ответа будет 12 байтов
0x04 0x78 - Это мы вручную взяли первые два байта ибо значение напряжения, как следует из таблицы, занимает 2 байта. 0x0478 равно 1144 в десятичных (4 x 256 + 120 = 1140). В таблице указан множитель 10. Значит надо поделить на 10, что равно напряжению 114.4 вольта. Это правильно. В целях эксперимента мы подали на автомат 230В, а не 400В. Алгоритм внутри этого не ожидал и разделил на два. Итого напряжение равно 228.8В, что примерно правильно.
Нюансы
Сложность отладки протокола состоит в том, что в случае ошибочного запроса второе устройство просто ничего не ответит. Передаваемые байты достаточно просто отследить, но это не всё. Требуется также задать настройки серийного порта:
Port. В нашем случае операционная система отражает USB переходник аналогично любому другому устройству, а именно как обычный файл, доступный на чтение и запись. Это что-то типа dev/tty.usbserial-10/
Baudrate. Это скорость передачи. Очень важно для синхронизации устройств. Обычно это 9600, но легко меняется через меню на лицевой стороне автомата.
Bytesize. Размерность байта. Примерно всегда равно 8.
Stopbit. Примерно всегда равно 1.
Parity. Четность может принимать значения Even, Odd или None.
Высокоуровневые языки типа Python или Ruby все остальные настройки берут на себя. Настройки на более низком уровне оставим за пределами данного материала. Вы можете обратиться к нашим специалистам за помощью. В любом случае, рекомендуем использовать библиотеки с открытым кодом для абстракции от деталей имплетентации.
Пример на Ruby
Данная реализация основана на библиотеке uart. Код индикативный и не гарантируем работоспобность.
require 'uart'
class ModbusCH341
READ_HOLDING_REGISTERS = 0x03
def initialize(port = '/dev/tty.usbserial-10', baud_rate = 9600, data_bits = 8, stop_bits = 1, parity = :none)
@port = port
@baud_rate = baud_rate
@data_bits = data_bits
@stop_bits = stop_bits
@parity = parity
@serial = nil
end
def connect
@serial = UART.open(@port, @baud_rate, @parity)
true
rescue => e
puts "Error connecting to device: #{e.message}"
false
end
def disconnect
@serial.close if @serial
@serial = nil
end
def read_voltage_a
high, low = read_holding_registers(1, 0x0B, 2)
big_int = (high << 16) | low
big_int.to_f / 10.0
end
def read_holding_registers(slave_address, start_address, quantity)
request = build_request(slave_address, READ_HOLDING_REGISTERS, start_address, quantity)
send_request(request)
end
def build_request(slave_address, function_code, address, value)
request = [
slave_address,
function_code,
(address >> 8) & 0xFF,
address & 0xFF,
(value >> 8) & 0xFF,
value & 0xFF
]
crc = calculate_crc(request)
request + [crc & 0xFF, (crc >> 8) & 0xFF]
end
def send_request(request)
return nil unless @serial
@serial.flush
@serial.write(request.pack('C*'))
sleep(0.1) # Small delay to allow device to process
response = []
while (byte = @serial.getbyte)
response << byte
break if response.length >= 5 && response.length >= response[2] + 5
end
response
end
def calculate_crc(data)
crc = 0xFFFF
data.each do |byte|
crc ^= byte
8.times do
if (crc & 0x0001) != 0
crc = (crc >> 1) ^ 0xA001
else
crc = crc >> 1
end
end
end
crc
end
def parse_register_response(response)
return nil if response.nil? || response.length < 5
slave_address = response[0]
function_code = response[1]
byte_count = response[2]
if function_code == READ_HOLDING_REGISTERS
values = []
(0...byte_count).step(2) do |i|
value = (response[3 + i] << 8) | response[4 + i]
values << value
end
values
else
nil
end
end
end
if __FILE__ == $PROGRAM_NAME
modbus = ModbusCH341.new('/dev/tty.usbserial-10', 9600, 8, 1, '8E1')
if modbus.connect
begin
value = modbus.read_voltage_a
puts "Напряжение по фазе А: #{value}"
ensure
modbus.disconnect
end
end
end
Пример на Python
Реализация основана на библиотеке pymodbus. Код индикативный и не гарантируем работоспобность.
import os
import sys
from collections.abc import Callable
from pymodbus.client import ModbusSerialClient
from pymodbus.exceptions import ModbusException
def env(name: str, default: str) -> str:
return os.getenv(name, default).strip()
def env_int(name: str, default: int) -> int:
raw = os.getenv(name)
if raw is None:
return default
try:
return int(raw)
except ValueError as exc:
raise ValueError(f"{name} must be an integer, got: {raw!r}") from exc
def env_optional_int(name: str) -> int | None:
raw = os.getenv(name)
if raw is None or raw.strip() == "":
return None
try:
return int(raw)
except ValueError as exc:
raise ValueError(f"{name} must be an integer, got: {raw!r}") from exc
def env_bool(name: str, default: bool) -> bool:
raw = os.getenv(name)
if raw is None:
return default
return raw.strip().lower() in {"1", "true", "yes", "on"}
def frame_hex(data: bytes) -> str:
return " ".join(f"{byte:02X}" for byte in data)
def regs_hex(registers: list[int]) -> str:
return " ".join(f"0x{value:04X}" for value in registers)
def packet_tracer(enabled: bool) -> Callable[[bool, bytes], bytes]:
def _trace(is_tx: bool, data: bytes) -> bytes:
if enabled:
direction = "TX" if is_tx else "RX"
print(f"[FRAME {direction}] {frame_hex(data)}")
return data
return _trace
def read_regs(
client: ModbusSerialClient, slave_id: int, address: int, count: int
) -> list[int]:
response = client.read_holding_registers(
address=address,
count=count,
device_id=slave_id,
)
if response.isError():
raise ModbusException(f"Read error at 0x{address:04X}: {response}")
return response.registers
def u32_from_regs(registers: list[int], start_idx: int) -> int:
return (registers[start_idx] << 16) | registers[start_idx + 1]
def main() -> int:
# Keep connection params in env vars so this script can be reused
# across different adapters/devices without code changes.
port = env("MODBUS_PORT", "/dev/tty.usbserial-10")
slave_id = env_int("MODBUS_SLAVE_ID", 1)
baudrate = env_int("MODBUS_BAUDRATE", 9600)
bytesize = env_int("MODBUS_BYTESIZE", 8)
stopbits = env_int("MODBUS_STOPBITS", 1)
timeout = float(env("MODBUS_TIMEOUT", "2.0"))
# STM1Y default communication setting is even parity.
parity = env("MODBUS_PARITY", "O")
ir_to_write = env_optional_int("IR")
raw_mode = env_bool("MODBUS_RAW", False)
trace_mode = env_bool("MODBUS_TRACE", True)
client = ModbusSerialClient(
port,
baudrate=baudrate,
bytesize=bytesize,
stopbits=stopbits,
parity=parity,
timeout=timeout,
retries=1,
reconnect_delay=0,
reconnect_delay_max=0,
trace_packet=packet_tracer(trace_mode),
)
print(
"Connecting via RTU:",
f"port={port}, slave={slave_id}, baud={baudrate}, parity={parity}",
flush=True,
)
if not client.connect():
print("Failed to connect to serial adapter.")
return 2
try:
if ir_to_write is not None:
if not 100 <= ir_to_write <= 250:
print("IR must be in range 100..250 when provided.")
return 3
write_response = client.write_register(
address=0x1000,
value=ir_to_write,
device_id=slave_id,
)
if write_response.isError():
raise ModbusException(f"Write error at 0x1000: {write_response}")
print(f"Wrote IR={ir_to_write} to register 0x1000")
# 0x000B..0x0010: phase+line voltages (U16, scale *10)
volt_regs = read_regs(client, slave_id=slave_id, address=0x000B, count=6)
# 0x0012..0x0017: IA/IB/IC (U32, scale *10)
current_regs = read_regs(client, slave_id=slave_id, address=0x0012, count=6)
# 0x001E: frequency (U16, scale *10)
freq_regs = read_regs(client, slave_id=slave_id, address=0x001E, count=1)
# 0x1000: Ir and Tr
ir_tr_regs = read_regs(client, slave_id=slave_id, address=0x1000, count=2)
ua = volt_regs[0] / 10.0
ub = volt_regs[1] / 10.0
uc = volt_regs[2] / 10.0
uab = volt_regs[3] / 10.0
ubc = volt_regs[4] / 10.0
uca = volt_regs[5] / 10.0
ia = u32_from_regs(current_regs, 0) / 10.0
ib = u32_from_regs(current_regs, 2) / 10.0
ic = u32_from_regs(current_regs, 4) / 10.0
freq = freq_regs[0] / 10.0
ir = ir_tr_regs[0]
tr = ir_tr_regs[1]
print("STM1Y decoded values:")
print(f" Phase voltage: Ua={ua:.1f} V, Ub={ub:.1f} V, Uc={uc:.1f} V")
print(f" Line voltage: Uab={uab:.1f} V, Ubc={ubc:.1f} V, Uca={uca:.1f} V")
print(f" Current: Ia={ia:.1f} A, Ib={ib:.1f} A, Ic={ic:.1f} A")
print(f" Frequency: F={freq:.1f} Hz")
print(f" Ir: {ir:.1f} A")
print(f" Tr: {tr:.1f} sec")
return 0
except (ModbusException, OSError, IndexError) as exc:
print(f"Connection/read failed: {exc}")
return 3
finally:
client.close()
if __name__ == "__main__":
raise SystemExit(main())
Пример на Crystal
Этот язык не столь популярен, но вся внутренняя инфраструктура Texenergo выполнена на нём. Приводим его так как здесь требуется более низкоуровневая настройка и он работает без сторонних библиотек. Код индикативный и не гарантируем работоспобность.
require "option_parser"
require "c/fcntl"
require "c/termios"
module ModbusUART
class Error < Exception
end
struct Request
getter slave_id : UInt8
getter function : UInt8
getter address : UInt16
getter quantity : UInt16
def initialize(@slave_id : UInt8, @function : UInt8, @address : UInt16, @quantity : UInt16)
end
end
def self.parse_int(value : String) : Int32
v = value.strip
(v.starts_with?("0x") || v.starts_with?("0X")) ? v[2..].to_i(16) : v.to_i
end
def self.normalize_device(device : String) : String
return device unless device.starts_with?("/dev/tty.")
cu_device = device.sub("/dev/tty.", "/dev/cu.")
File.exists?(cu_device) ? cu_device : device
end
def self.crc16_modbus(data : Bytes) : UInt16
crc = 0xFFFF_u16
data.each do |byte|
crc = (crc ^ byte.to_u16).to_u16
8.times do
crc = (crc & 0x0001) == 0x0001 ? ((crc >> 1) ^ 0xA001).to_u16 : (crc >> 1).to_u16
end
end
crc
end
def self.request_frame(req : Request) : Bytes
pdu = Bytes[
req.slave_id,
req.function,
((req.address >> 8) & 0xFF).to_u8,
(req.address & 0xFF).to_u8,
((req.quantity >> 8) & 0xFF).to_u8,
(req.quantity & 0xFF).to_u8,
]
crc = crc16_modbus(pdu)
Bytes[pdu[0], pdu[1], pdu[2], pdu[3], pdu[4], pdu[5], (crc & 0xFF).to_u8, ((crc >> 8) & 0xFF).to_u8]
end
def self.hex(bytes : Bytes) : String
bytes.map { |b| b.to_s(16).upcase.rjust(2, '0') }.join(" ")
end
def self.open_configured_serial(device : String, baud : Int32, mode : String) : IO::FileDescriptor
data_bits, parity, stop_bits = parse_mode(mode)
fd = LibC.open(device, LibC::O_RDWR | LibC::O_NONBLOCK)
raise Error.new("open failed for #{device}: #{Errno.value.message}") if fd < 0
current_flags = LibC.fcntl(fd, LibC::F_GETFL)
if current_flags < 0
LibC.close(fd)
raise Error.new("fcntl(F_GETFL) failed: #{Errno.value.message}")
end
if LibC.fcntl(fd, LibC::F_SETFL, current_flags & ~LibC::O_NONBLOCK) < 0
LibC.close(fd)
raise Error.new("fcntl(F_SETFL) failed: #{Errno.value.message}")
end
t = uninitialized LibC::Termios
if LibC.tcgetattr(fd, pointerof(t)) != 0
LibC.close(fd)
raise Error.new("tcgetattr failed: #{Errno.value.message}")
end
LibC.cfmakeraw(pointerof(t))
t.c_cflag &= ~(LibC::CSIZE | LibC::PARENB | LibC::PARODD | LibC::CSTOPB)
t.c_cflag |= parse_data_bits(data_bits)
t.c_cflag |= (LibC::CLOCAL | LibC::CREAD)
case parity
when "N"
when "E" then t.c_cflag |= LibC::PARENB
when "O" then t.c_cflag |= (LibC::PARENB | LibC::PARODD)
else
LibC.close(fd)
raise Error.new("Unsupported parity: #{parity}")
end
t.c_cflag |= LibC::CSTOPB if stop_bits == "2"
speed = parse_baud(baud)
t.c_ispeed = speed
t.c_ospeed = speed
t.c_cc[LibC::VMIN] = 0_u8
{% if LibC.has_constant?(:VTIME) %}
t.c_cc[LibC::VTIME] = 5_u8
{% else %}
# Darwin bindings don't expose VTIME constant; it follows VMIN.
t.c_cc[LibC::VMIN + 1] = 5_u8
{% end %}
if LibC.tcsetattr(fd, LibC::TCSANOW, pointerof(t)) != 0
LibC.close(fd)
raise Error.new("tcsetattr failed: #{Errno.value.message}")
end
IO::FileDescriptor.new(fd)
end
def self.parse_mode(mode : String) : Tuple(String, String, String)
match = /^(\d)([NEO])(\d)$/.match(mode.upcase)
raise Error.new("Invalid mode '#{mode}', expected e.g. 8E1") unless match
{match[1], match[2], match[3]}
end
def self.read_response(io : IO, expected_function : UInt8, timeout : Time::Span) : Bytes
start = Time.instant
out = IO::Memory.new
tmp = Bytes.new(256)
expected_size = 5
loop do
n = io.read(tmp)
if n > 0
out.write(tmp[0, n])
data = out.to_slice
if data.size >= 3
if data[1] == (expected_function | 0x80_u8)
expected_size = 5
else
expected_size = data[2] + 5
end
end
break if data.size >= expected_size
end
break if (Time.instant - start) >= timeout
sleep 20.milliseconds
end
out.to_slice
end
private def self.parse_data_bits(bits : String)
case bits
when "5" then LibC::CS5
when "6" then LibC::CS6
when "7" then LibC::CS7
when "8" then LibC::CS8
else
raise Error.new("Unsupported data bits: #{bits}")
end
end
private def self.parse_baud(baud : Int32)
case baud
when 0 then LibC::B0
when 50 then LibC::B50
when 75 then LibC::B75
when 110 then LibC::B110
when 134 then LibC::B134
when 150 then LibC::B150
when 200 then LibC::B200
when 300 then LibC::B300
when 600 then LibC::B600
when 1200 then LibC::B1200
when 1800 then LibC::B1800
when 2400 then LibC::B2400
when 4800 then LibC::B4800
when 9600 then LibC::B9600
when 19200 then LibC::B19200
when 38400 then LibC::B38400
else
raise Error.new("Unsupported baud rate: #{baud}")
end
end
end
device = "/dev/tty.usbserial-10"
baud = 9600
mode = "8E1"
slave_id = 1_u8
function = 0x03_u8
address = 0x000B_u16
quantity = 6_u16
timeout_seconds = 1.5
OptionParser.parse do |parser|
parser.banner = "Usage: crystal run scripts/modbus_uart.cr -- [options]"
parser.on("--device DEVICE", "Serial device (default: #{device})") { |v| device = v }
parser.on("--baud BAUD", "Baud rate (default: #{baud})") { |v| baud = v.to_i }
parser.on("--mode MODE", "UART mode 8N1/8E1/8O1 (default: #{mode})") { |v| mode = v }
parser.on("--slave ID", "Modbus slave id (default: #{slave_id})") { |v| slave_id = v.to_u8 }
parser.on("--function CODE", "Function dec/hex (default: 0x03)") { |v| function = ModbusUART.parse_int(v).to_u8 }
parser.on("--address ADDR", "Start address dec/hex (default: 0x000B)") { |v| address = ModbusUART.parse_int(v).to_u16 }
parser.on("--quantity N", "Quantity (default: #{quantity})") { |v| quantity = v.to_i.to_u16 }
parser.on("--timeout SEC", "Read timeout seconds (default: #{timeout_seconds})") { |v| timeout_seconds = v.to_f64 }
parser.on("-h", "--help", "Show this help") { puts parser; exit(0) }
end
device = ModbusUART.normalize_device(device)
request = ModbusUART::Request.new(slave_id: slave_id, function: function, address: address, quantity: quantity)
frame = ModbusUART.request_frame(request)
begin
io = ModbusUART.open_configured_serial(device, baud, mode)
puts "Device : #{device}"
puts "Request: #{ModbusUART.hex(frame)}"
begin
io.read_timeout = timeout_seconds.seconds
io.write(frame)
io.flush
response = ModbusUART.read_response(io, function, timeout_seconds.seconds)
raise ModbusUART::Error.new("No response received") if response.empty?
puts "Response: #{ModbusUART.hex(response)}"
body = response[0, response.size - 2]
recv_crc = ((response[-1].to_u16 << 8) | response[-2].to_u16).to_u16
calc_crc = ModbusUART.crc16_modbus(body)
raise ModbusUART::Error.new("CRC mismatch: recv=0x#{recv_crc.to_s(16).upcase} calc=0x#{calc_crc.to_s(16).upcase}") if recv_crc != calc_crc
if response[1] == (function | 0x80_u8)
raise ModbusUART::Error.new("Modbus exception 0x#{response[2].to_s(16).upcase}")
end
if function.in? 0x03_u8, 0x04_u8
regs = [] of UInt16
i = 0
while i < response[2]
regs << (((response[3 + i].to_u16 << 8) | response[4 + i].to_u16).to_u16)
i += 2
end
puts "Registers: #{regs.join(", ")}"
end
ensure
io.close
end
rescue ex
STDERR.puts "Error: #{ex.message}"
exit(1)
end