Modbus на ВА69 с электронным расцепителем

Автоматические выключатели серии ВА69 стали доступны с электронным расцепителем для взаимодействия с другими устройствами по протоколу Modbus. Обычно это рядовая процедура, штатно интегрируемая в систему диспетчеризации или автоматизация конечного заказчика.

Мы же, в целях демонстрации концепции и на примере данного оборудования, пройдёмся с начала до конца.

Физическое подключение

Коммуникация осуществляется через обычную витую пару со всеми стандартными рекомендациями - заземление, установка резистора, экранирование, обеспечение ЭМС, длина провода и так далее. Порт на устройстве с Modbus будет иметь два выхода - А(+) и В(-). Их требуется подключить к соответствующим портам на компьютере. А(+) подключается к А(+) и В(-) подключается к В(-). Если устройств несколько, то они все подключаются последовательно.

В данном примере мы будем подключаться напрямую к компьютеру с операционной системой OS X или Linux. Для физического подключения потребуется USB переходник. Он легко покупается и называется типа RS485 USB, MAX485 или CH301.

Коротко про Modbus

Если очень сильно упростить, то наличие Modbus на устройстве подразумевает наличие процессора в стиле компьютерного. Общеизвестно, что у каждого процессора есть память. В ней закаченная программа хранит все нужные значения. Через протокол Modbus мы, очень упрощено, можем считывать значения в нужных ячейках памяти. Иногда мы можем эти значения менять.

Учитывая восможность одновременной работы в цепи более, чем с одним устройством, во избежании конфликтов, у каждого устройства выставляется свой адрес.

Для каждого устройства предоставляется таблица с адресами нужных параметров. Например:

Адрес Длина Комментарий
0x000B U16 или 2 байта Напряжение на фазе А в вольтах х10
0x000C U16 или 2 байта Напряжение на фазе В в вольтах х10 
0x000D U16 или 2 байта  Напряжение на фазе С в вольтах х10 

Формат протокола Modbus

Протокол сначала плохо воспринимается визуально, но он достаточно простой. Рассмотрим на примере:

0x01 0x03 0x00 0x0B 0x00 0x06 0xB4 0x0A

0x01 - Первый байт означает адрес устройства. В нашем случае автомат ВА69 работает под номером 1, но это легко меняется с помощью меню на лицевой стороне автомата.

0х03 - Второй байт означает номер команды для выполнения из списка. 3 - читать сразу же несколько регистров. Команда же 6 позволяет записывать значения. Например, менять удаленно код уставки.

0x00 0x0B - Два байта, указывающие нужный адрес. Простоты ради оно же пишется как 0x000B. См таблицу - там находится значение напряжения по фазе А.

0x00 0x06 - Два байта, указывающие сколько байтов вернуть. В нашем случае, см таблицу, мы возвращаем 6 (три раза по 2 байта), чтобы одновременно прочитать напряжения по фазам А, В и С. 

0xB4 0x0A - Это высчитываемое значение подписи CRC. Оно используется для того, чтобы принимающее устройство прочитало команду и могло удостоверить, что при передаче не перепутан ни один бит.

Итого. Команда читается следующим образом: "У устройства под номером 1 считай 6 байтов начиная с адреса 0x000B"

Получение CRC

Мы немного отвлекаемся в сторону. Достаточно сказать, что реализацию CRC легко найти на нужном языке легко найти в интернете или использовать ИИ для написания. Приведём пример на Python:

def generate_crc16_table(cls) -> list[int]:
        """Generate a crc16 lookup table.

        .. note:: This will only be generated once
        """
        result = []
        for byte in range(256):
            crc = 0x0000
            for _ in range(8):
                if (byte ^ crc) & 0x0001:
                    crc = (crc >> 1) ^ 0xA001
                else:
                    crc >>= 1
                byte >>= 1
            result.append(crc)
        return result

Ответ от устройства

Автоматический выключатель ответит через короткое время в аналогичном формате. Его надо будет разобрать. Пример:

0x01 0x03 0x0C 0x04 0x78 0x04 0x78 0x00 0x00 0x07 0xBD 0x07 0xBD 0x00 0x00 0x5D 0x35

0х01 - Означает, что отвечает устройство под номером 1

0х03 - Это устройство отвечает на команду под номером 3

0х0С - Длина ответа будет 12 байтов

0x04 0x78 - Это мы вручную взяли первые два байта ибо значение напряжения, как следует из таблицы, занимает 2 байта. 0x0478 равно 1144 в десятичных (4 x 256 + 120 = 1140). В таблице указан множитель 10. Значит надо поделить на 10, что равно напряжению 114.4 вольта. Это правильно. В целях эксперимента мы подали на автомат 230В, а не 400В. Алгоритм внутри этого не ожидал и разделил на два. Итого напряжение равно 228.8В, что примерно правильно.

Нюансы

Сложность отладки протокола состоит в том, что в случае ошибочного запроса второе устройство просто ничего не ответит. Передаваемые байты достаточно просто отследить, но это не всё. Требуется также задать настройки серийного порта:

Port. В нашем случае операционная система отражает USB переходник аналогично любому другому устройству, а именно как обычный файл, доступный на чтение и запись. Это что-то типа dev/tty.usbserial-10/

Baudrate. Это скорость передачи. Очень важно для синхронизации устройств. Обычно это 9600, но легко меняется через меню на лицевой стороне автомата.

Bytesize. Размерность байта. Примерно всегда равно 8.

Stopbit. Примерно всегда равно 1.

Parity. Четность может принимать значения Even, Odd или None.

Высокоуровневые языки типа Python или Ruby все остальные настройки берут на себя. Настройки на более низком уровне оставим за пределами данного материала. Вы можете обратиться к нашим специалистам за помощью. В любом случае, рекомендуем использовать библиотеки с открытым кодом для абстракции от деталей имплетентации.

Пример на Ruby

Данная реализация основана на библиотеке uart. Код индикативный и не гарантируем работоспобность.


require 'uart'

class ModbusCH341
  READ_HOLDING_REGISTERS = 0x03

  def initialize(port = '/dev/tty.usbserial-10', baud_rate = 9600, data_bits = 8, stop_bits = 1, parity = :none)
    @port = port
    @baud_rate = baud_rate
    @data_bits = data_bits
    @stop_bits = stop_bits
    @parity = parity
    @serial = nil
  end

  def connect
    @serial = UART.open(@port, @baud_rate, @parity)
    true
  rescue => e
    puts "Error connecting to device: #{e.message}"
    false
  end

  def disconnect
    @serial.close if @serial
    @serial = nil
  end

  def read_voltage_a
    high, low = read_holding_registers(1, 0x0B, 2)
    big_int = (high << 16) | low
    big_int.to_f / 10.0
  end

  def read_holding_registers(slave_address, start_address, quantity)
    request = build_request(slave_address, READ_HOLDING_REGISTERS, start_address, quantity)
    send_request(request)
  end

  def build_request(slave_address, function_code, address, value)
    request = [
      slave_address,
      function_code,
      (address >> 8) & 0xFF,
      address & 0xFF,
      (value >> 8) & 0xFF,
      value & 0xFF
    ]
    crc = calculate_crc(request)
    request + [crc & 0xFF, (crc >> 8) & 0xFF]
  end

  def send_request(request)
    return nil unless @serial

    @serial.flush
    @serial.write(request.pack('C*'))
    sleep(0.1) # Small delay to allow device to process

    response = []
    while (byte = @serial.getbyte)
      response << byte
      break if response.length >= 5 && response.length >= response[2] + 5
    end

    response
  end

  def calculate_crc(data)
    crc = 0xFFFF
    data.each do |byte|
      crc ^= byte
      8.times do
        if (crc & 0x0001) != 0
          crc = (crc >> 1) ^ 0xA001
        else
          crc = crc >> 1
        end
      end
    end
    crc
  end

  def parse_register_response(response)
    return nil if response.nil? || response.length < 5
    slave_address = response[0]
    function_code = response[1]
    byte_count = response[2]

    if function_code == READ_HOLDING_REGISTERS
      values = []
      (0...byte_count).step(2) do |i|
        value = (response[3 + i] << 8) | response[4 + i]
        values << value
      end
      values
    else
      nil
    end
  end
end

if __FILE__ == $PROGRAM_NAME
  modbus = ModbusCH341.new('/dev/tty.usbserial-10', 9600, 8, 1, '8E1')
  if modbus.connect
    begin
      value = modbus.read_voltage_a
      puts "Напряжение по фазе А: #{value}"
    ensure
      modbus.disconnect
    end
  end
end

Пример на Python

Реализация основана на библиотеке pymodbus. Код индикативный и не гарантируем работоспобность.


import os
import sys
from collections.abc import Callable

from pymodbus.client import ModbusSerialClient
from pymodbus.exceptions import ModbusException


def env(name: str, default: str) -> str:
    return os.getenv(name, default).strip()


def env_int(name: str, default: int) -> int:
    raw = os.getenv(name)
    if raw is None:
        return default
    try:
        return int(raw)
    except ValueError as exc:
        raise ValueError(f"{name} must be an integer, got: {raw!r}") from exc


def env_optional_int(name: str) -> int | None:
    raw = os.getenv(name)
    if raw is None or raw.strip() == "":
        return None
    try:
        return int(raw)
    except ValueError as exc:
        raise ValueError(f"{name} must be an integer, got: {raw!r}") from exc


def env_bool(name: str, default: bool) -> bool:
    raw = os.getenv(name)
    if raw is None:
        return default
    return raw.strip().lower() in {"1", "true", "yes", "on"}


def frame_hex(data: bytes) -> str:
    return " ".join(f"{byte:02X}" for byte in data)


def regs_hex(registers: list[int]) -> str:
    return " ".join(f"0x{value:04X}" for value in registers)


def packet_tracer(enabled: bool) -> Callable[[bool, bytes], bytes]:
    def _trace(is_tx: bool, data: bytes) -> bytes:
        if enabled:
            direction = "TX" if is_tx else "RX"
            print(f"[FRAME {direction}] {frame_hex(data)}")
        return data

    return _trace


def read_regs(
    client: ModbusSerialClient, slave_id: int, address: int, count: int
) -> list[int]:
    response = client.read_holding_registers(
        address=address,
        count=count,
        device_id=slave_id,
    )
    if response.isError():
        raise ModbusException(f"Read error at 0x{address:04X}: {response}")
    return response.registers


def u32_from_regs(registers: list[int], start_idx: int) -> int:
    return (registers[start_idx] << 16) | registers[start_idx + 1]


def main() -> int:
    # Keep connection params in env vars so this script can be reused
    # across different adapters/devices without code changes.
    port = env("MODBUS_PORT", "/dev/tty.usbserial-10")
    slave_id = env_int("MODBUS_SLAVE_ID", 1)
    baudrate = env_int("MODBUS_BAUDRATE", 9600)
    bytesize = env_int("MODBUS_BYTESIZE", 8)
    stopbits = env_int("MODBUS_STOPBITS", 1)
    timeout = float(env("MODBUS_TIMEOUT", "2.0"))
    # STM1Y default communication setting is even parity.
    parity = env("MODBUS_PARITY", "O")
    ir_to_write = env_optional_int("IR")
    raw_mode = env_bool("MODBUS_RAW", False)
    trace_mode = env_bool("MODBUS_TRACE", True)

    client = ModbusSerialClient(
        port,
        baudrate=baudrate,
        bytesize=bytesize,
        stopbits=stopbits,
        parity=parity,
        timeout=timeout,
        retries=1,
        reconnect_delay=0,
        reconnect_delay_max=0,
        trace_packet=packet_tracer(trace_mode),
    )

    print(
        "Connecting via RTU:",
        f"port={port}, slave={slave_id}, baud={baudrate}, parity={parity}",
        flush=True,
    )
    if not client.connect():
        print("Failed to connect to serial adapter.")
        return 2

    try:
        if ir_to_write is not None:
            if not 100 <= ir_to_write <= 250:
                print("IR must be in range 100..250 when provided.")
                return 3
            write_response = client.write_register(
                address=0x1000,
                value=ir_to_write,
                device_id=slave_id,
            )
            if write_response.isError():
                raise ModbusException(f"Write error at 0x1000: {write_response}")
            print(f"Wrote IR={ir_to_write} to register 0x1000")

        # 0x000B..0x0010: phase+line voltages (U16, scale *10)
        volt_regs = read_regs(client, slave_id=slave_id, address=0x000B, count=6)
        # 0x0012..0x0017: IA/IB/IC (U32, scale *10)
        current_regs = read_regs(client, slave_id=slave_id, address=0x0012, count=6)
        # 0x001E: frequency (U16, scale *10)
        freq_regs = read_regs(client, slave_id=slave_id, address=0x001E, count=1)
        # 0x1000: Ir and Tr
        ir_tr_regs = read_regs(client, slave_id=slave_id, address=0x1000, count=2)

        ua = volt_regs[0] / 10.0
        ub = volt_regs[1] / 10.0
        uc = volt_regs[2] / 10.0
        uab = volt_regs[3] / 10.0
        ubc = volt_regs[4] / 10.0
        uca = volt_regs[5] / 10.0

        ia = u32_from_regs(current_regs, 0) / 10.0
        ib = u32_from_regs(current_regs, 2) / 10.0
        ic = u32_from_regs(current_regs, 4) / 10.0

        freq = freq_regs[0] / 10.0

        ir = ir_tr_regs[0]
        tr = ir_tr_regs[1]

        print("STM1Y decoded values:")
        print(f"  Phase voltage: Ua={ua:.1f} V, Ub={ub:.1f} V, Uc={uc:.1f} V")
        print(f"  Line voltage:  Uab={uab:.1f} V, Ubc={ubc:.1f} V, Uca={uca:.1f} V")
        print(f"  Current:       Ia={ia:.1f} A, Ib={ib:.1f} A, Ic={ic:.1f} A")
        print(f"  Frequency:     F={freq:.1f} Hz")
        print(f"  Ir:            {ir:.1f} A")
        print(f"  Tr:            {tr:.1f} sec")

        return 0
    except (ModbusException, OSError, IndexError) as exc:
        print(f"Connection/read failed: {exc}")
        return 3
    finally:
        client.close()


if __name__ == "__main__":
    raise SystemExit(main())

Пример на Crystal

Этот язык не столь популярен, но вся внутренняя инфраструктура Texenergo выполнена на нём. Приводим его так как здесь требуется более низкоуровневая настройка и он работает без сторонних библиотек. Код индикативный и не гарантируем работоспобность.


require "option_parser"
require "c/fcntl"
require "c/termios"

module ModbusUART
  class Error < Exception
  end

  struct Request
    getter slave_id : UInt8
    getter function : UInt8
    getter address : UInt16
    getter quantity : UInt16

    def initialize(@slave_id : UInt8, @function : UInt8, @address : UInt16, @quantity : UInt16)
    end
  end

  def self.parse_int(value : String) : Int32
    v = value.strip
    (v.starts_with?("0x") || v.starts_with?("0X")) ? v[2..].to_i(16) : v.to_i
  end

  def self.normalize_device(device : String) : String
    return device unless device.starts_with?("/dev/tty.")
    cu_device = device.sub("/dev/tty.", "/dev/cu.")
    File.exists?(cu_device) ? cu_device : device
  end

  def self.crc16_modbus(data : Bytes) : UInt16
    crc = 0xFFFF_u16
    data.each do |byte|
      crc = (crc ^ byte.to_u16).to_u16
      8.times do
        crc = (crc & 0x0001) == 0x0001 ? ((crc >> 1) ^ 0xA001).to_u16 : (crc >> 1).to_u16
      end
    end
    crc
  end

  def self.request_frame(req : Request) : Bytes
    pdu = Bytes[
      req.slave_id,
      req.function,
      ((req.address >> 8) & 0xFF).to_u8,
      (req.address & 0xFF).to_u8,
      ((req.quantity >> 8) & 0xFF).to_u8,
      (req.quantity & 0xFF).to_u8,
    ]
    crc = crc16_modbus(pdu)
    Bytes[pdu[0], pdu[1], pdu[2], pdu[3], pdu[4], pdu[5], (crc & 0xFF).to_u8, ((crc >> 8) & 0xFF).to_u8]
  end

  def self.hex(bytes : Bytes) : String
    bytes.map { |b| b.to_s(16).upcase.rjust(2, '0') }.join(" ")
  end

  def self.open_configured_serial(device : String, baud : Int32, mode : String) : IO::FileDescriptor
    data_bits, parity, stop_bits = parse_mode(mode)
    fd = LibC.open(device, LibC::O_RDWR | LibC::O_NONBLOCK)
    raise Error.new("open failed for #{device}: #{Errno.value.message}") if fd < 0

    current_flags = LibC.fcntl(fd, LibC::F_GETFL)
    if current_flags < 0
      LibC.close(fd)
      raise Error.new("fcntl(F_GETFL) failed: #{Errno.value.message}")
    end

    if LibC.fcntl(fd, LibC::F_SETFL, current_flags & ~LibC::O_NONBLOCK) < 0
      LibC.close(fd)
      raise Error.new("fcntl(F_SETFL) failed: #{Errno.value.message}")
    end

    t = uninitialized LibC::Termios
    if LibC.tcgetattr(fd, pointerof(t)) != 0
      LibC.close(fd)
      raise Error.new("tcgetattr failed: #{Errno.value.message}")
    end

    LibC.cfmakeraw(pointerof(t))

    t.c_cflag &= ~(LibC::CSIZE | LibC::PARENB | LibC::PARODD | LibC::CSTOPB)
    t.c_cflag |= parse_data_bits(data_bits)
    t.c_cflag |= (LibC::CLOCAL | LibC::CREAD)

    case parity
    when "N"
    when "E" then t.c_cflag |= LibC::PARENB
    when "O" then t.c_cflag |= (LibC::PARENB | LibC::PARODD)
    else
      LibC.close(fd)
      raise Error.new("Unsupported parity: #{parity}")
    end

    t.c_cflag |= LibC::CSTOPB if stop_bits == "2"

    speed = parse_baud(baud)
    t.c_ispeed = speed
    t.c_ospeed = speed
    t.c_cc[LibC::VMIN] = 0_u8
    {% if LibC.has_constant?(:VTIME) %}
      t.c_cc[LibC::VTIME] = 5_u8
    {% else %}
      # Darwin bindings don't expose VTIME constant; it follows VMIN.
      t.c_cc[LibC::VMIN + 1] = 5_u8
    {% end %}

    if LibC.tcsetattr(fd, LibC::TCSANOW, pointerof(t)) != 0
      LibC.close(fd)
      raise Error.new("tcsetattr failed: #{Errno.value.message}")
    end

    IO::FileDescriptor.new(fd)
  end

  def self.parse_mode(mode : String) : Tuple(String, String, String)
    match = /^(\d)([NEO])(\d)$/.match(mode.upcase)
    raise Error.new("Invalid mode '#{mode}', expected e.g. 8E1") unless match
    {match[1], match[2], match[3]}
  end

  def self.read_response(io : IO, expected_function : UInt8, timeout : Time::Span) : Bytes
    start = Time.instant
    out = IO::Memory.new
    tmp = Bytes.new(256)
    expected_size = 5

    loop do
      n = io.read(tmp)
      if n > 0
        out.write(tmp[0, n])
        data = out.to_slice

        if data.size >= 3
          if data[1] == (expected_function | 0x80_u8)
            expected_size = 5
          else
            expected_size = data[2] + 5
          end
        end

        break if data.size >= expected_size
      end

      break if (Time.instant - start) >= timeout
      sleep 20.milliseconds
    end

    out.to_slice
  end

  private def self.parse_data_bits(bits : String)
    case bits
    when "5" then LibC::CS5
    when "6" then LibC::CS6
    when "7" then LibC::CS7
    when "8" then LibC::CS8
    else
      raise Error.new("Unsupported data bits: #{bits}")
    end
  end

  private def self.parse_baud(baud : Int32)
    case baud
    when 0 then LibC::B0
    when 50 then LibC::B50
    when 75 then LibC::B75
    when 110 then LibC::B110
    when 134 then LibC::B134
    when 150 then LibC::B150
    when 200 then LibC::B200
    when 300 then LibC::B300
    when 600 then LibC::B600
    when 1200 then LibC::B1200
    when 1800 then LibC::B1800
    when 2400 then LibC::B2400
    when 4800 then LibC::B4800
    when 9600 then LibC::B9600
    when 19200 then LibC::B19200
    when 38400 then LibC::B38400
    else
      raise Error.new("Unsupported baud rate: #{baud}")
    end
  end
end

device = "/dev/tty.usbserial-10"
baud = 9600
mode = "8E1"
slave_id = 1_u8
function = 0x03_u8
address = 0x000B_u16
quantity = 6_u16
timeout_seconds = 1.5

OptionParser.parse do |parser|
  parser.banner = "Usage: crystal run scripts/modbus_uart.cr -- [options]"
  parser.on("--device DEVICE", "Serial device (default: #{device})") { |v| device = v }
  parser.on("--baud BAUD", "Baud rate (default: #{baud})") { |v| baud = v.to_i }
  parser.on("--mode MODE", "UART mode 8N1/8E1/8O1 (default: #{mode})") { |v| mode = v }
  parser.on("--slave ID", "Modbus slave id (default: #{slave_id})") { |v| slave_id = v.to_u8 }
  parser.on("--function CODE", "Function dec/hex (default: 0x03)") { |v| function = ModbusUART.parse_int(v).to_u8 }
  parser.on("--address ADDR", "Start address dec/hex (default: 0x000B)") { |v| address = ModbusUART.parse_int(v).to_u16 }
  parser.on("--quantity N", "Quantity (default: #{quantity})") { |v| quantity = v.to_i.to_u16 }
  parser.on("--timeout SEC", "Read timeout seconds (default: #{timeout_seconds})") { |v| timeout_seconds = v.to_f64 }
  parser.on("-h", "--help", "Show this help") { puts parser; exit(0) }
end

device = ModbusUART.normalize_device(device)
request = ModbusUART::Request.new(slave_id: slave_id, function: function, address: address, quantity: quantity)
frame = ModbusUART.request_frame(request)

begin
  io = ModbusUART.open_configured_serial(device, baud, mode)
  puts "Device : #{device}"
  puts "Request: #{ModbusUART.hex(frame)}"

  begin
    io.read_timeout = timeout_seconds.seconds
    io.write(frame)
    io.flush

    response = ModbusUART.read_response(io, function, timeout_seconds.seconds)
    raise ModbusUART::Error.new("No response received") if response.empty?

    puts "Response: #{ModbusUART.hex(response)}"

    body = response[0, response.size - 2]
    recv_crc = ((response[-1].to_u16 << 8) | response[-2].to_u16).to_u16
    calc_crc = ModbusUART.crc16_modbus(body)
    raise ModbusUART::Error.new("CRC mismatch: recv=0x#{recv_crc.to_s(16).upcase} calc=0x#{calc_crc.to_s(16).upcase}") if recv_crc != calc_crc

    if response[1] == (function | 0x80_u8)
      raise ModbusUART::Error.new("Modbus exception 0x#{response[2].to_s(16).upcase}")
    end

    if function.in? 0x03_u8, 0x04_u8
      regs = [] of UInt16
      i = 0
      while i < response[2]
        regs << (((response[3 + i].to_u16 << 8) | response[4 + i].to_u16).to_u16)
        i += 2
      end
      puts "Registers: #{regs.join(", ")}"
    end
  ensure
    io.close
  end
rescue ex
  STDERR.puts "Error: #{ex.message}"
  exit(1)
end