Extra functions

Pymodbus: Modbus Protocol Implementation.

Released under the BSD license

class pymodbus.ExceptionResponse(function_code: int, exception_code: int = 0, device_id: int = 1, transaction: int = 0)

Bases: ModbusPDU

Base class for a modbus exception PDU.

decode(data: bytes) None

Decode a modbus exception response.

encode() bytes

Encode a modbus exception response.

rtu_frame_size: int = 5
class pymodbus.FramerType(*values)

Bases: str, Enum

Type of Modbus frame.

ASCII = 'ascii'
RTU = 'rtu'
SOCKET = 'socket'
TLS = 'tls'
class pymodbus.ModbusDeviceIdentification(info=None, info_name=None)

Bases: object

This is used to supply the device identification.

For the readDeviceIdentification function

For more information read section 6.21 of the modbus application protocol.

property MajorMinorRevision
property ModelName
property ProductCode
property ProductName
property UserApplicationName
property VendorName
property VendorUrl
stat_data = {0: '', 1: '', 2: '', 3: '', 4: '', 5: '', 6: '', 7: '', 8: ''}
summary()

Return a summary of the main items.

Returns:

An dictionary of the main items

update(value)

Update the values of this identity.

using another identify as the value

Parameters:

value – The value to copy values from

exception pymodbus.ModbusException(string)

Bases: Exception

Base modbus exception.

isError()

Error

pymodbus.pymodbus_apply_logging_config(level: str | int = 10, log_file_name: str | None = None)

Apply basic logging configuration used by default by Pymodbus maintainers.

Parameters:
  • level – (optional) set log level, if not set it is inherited.

  • log_file_name – (optional) log additional to file

Please call this function to format logging appropriately when opening issues.

Pymodbus Exceptions.

Custom exceptions to be used in the Modbus code.

exception pymodbus.exceptions.ConnectionException(string='')

Bases: ModbusException

Error resulting from a bad connection.

exception pymodbus.exceptions.MessageRegisterException(string='')

Bases: ModbusException

Error resulting from failing to register a custom message request/response.

exception pymodbus.exceptions.ModbusIOException(string='', function_code=None)

Bases: ModbusException

Error resulting from data i/o.

exception pymodbus.exceptions.NoSuchIdException(string='')

Bases: ModbusException

Error resulting from making a request to a id that does not exist.

exception pymodbus.exceptions.NotImplementedException(string='')

Bases: ModbusException

Error resulting from not implemented function.

exception pymodbus.exceptions.ParameterException(string='')

Bases: ModbusException

Error resulting from invalid parameter.

Transaction.

class pymodbus.transaction.TransactionManager(params: CommParams, framer: FramerBase, retries: int, is_server: bool, trace_packet: Callable[[bool, bytes], bytes] | None, trace_pdu: Callable[[bool, ModbusPDU], ModbusPDU] | None, trace_connect: Callable[[bool], None] | None, sync_client=None)

Bases: ModbusProtocol

Transaction manager.

This is the central class of the library, providing a separation between API and communication: - clients/servers calls the manager to execute requests/responses - transport/framer/pdu is by the manager to communicate with the devices

Transaction manager handles: - Execution of requests (client), with retries and locking - Sending of responses (server), with retries - Connection management (on top of what transport offers) - No response (temporarily) from a device

Transaction manager offers: - a simple execute interface for requests (client) - a simple send interface for responses (server) - external trace methods tracing outgoing/incoming packets/PDUs (byte stream)

callback_connected() None

Call when connection is successful.

callback_data(data: bytes, addr: tuple | None = None) int

Handle received data.

callback_disconnected(exc: Exception | None) None

Call when connection is lost.

callback_new_connection()

Call when listener receive new connection request.

dummy_trace_connect(connect: bool) None

Do dummy trace.

dummy_trace_packet(sending: bool, data: bytes) bytes

Do dummy trace.

dummy_trace_pdu(sending: bool, pdu: ModbusPDU) ModbusPDU

Do dummy trace.

async execute(no_response_expected: bool, request: ModbusPDU) ModbusPDU

Execute requests asynchronously.

REMARK: this method is identical to sync_execute, apart from the lock and try/except.

any changes in either method MUST be mirrored !!!

getNextTID() int

Retrieve the next transaction identifier.

pdu_send(pdu: ModbusPDU, addr: tuple | None = None) None

Build byte stream and send.

sync_execute(no_response_expected: bool, request: ModbusPDU) ModbusPDU

Execute requests asynchronously.

REMARK: this method is identical to execute, apart from the lock and sync_receive.

any changes in either method MUST be mirrored !!!

sync_get_response(dev_id, tid) ModbusPDU

Receive until PDU is correct or timeout.

Modbus Utilities.

A collection of utilities for packing data, unpacking data computing checksums, and decode checksums.

pymodbus.utilities.dict_property(store, index)

Create class properties from a dictionary.

Basically this allows you to remove a lot of possible boilerplate code.

Parameters:
  • store – The store store to pull from

  • index – The index into the store to close over

Returns:

An initialized property set

pymodbus.utilities.hexlify_packets(packet)

Return hex representation of bytestring received.

Parameters:

packet

Returns:

PDU classes

Modbus Request/Response Decoders.

class pymodbus.pdu.decoders.DecodePDU(is_server: bool)

Bases: object

Decode pdu requests/responses (server/client).

classmethod add_pdu(req: type[ModbusPDU], resp: type[ModbusPDU])

Register request/response.

classmethod add_sub_pdu(req: type[ModbusPDU], resp: type[ModbusPDU])

Register request/response.

decode(frame: bytes) ModbusPDU | None

Decode a frame.

list_function_codes()

Return list of function codes.

lookupPduClass(data: bytes) type[ModbusPDU] | None

Use function_code to determine the class of the PDU.

pdu_sub_table: dict[int, dict[int, tuple[type[ModbusPDU], type[ModbusPDU]]]] = {8: {0: (<class 'pymodbus.pdu.diag_message.ReturnQueryDataRequest'>, <class 'pymodbus.pdu.diag_message.ReturnQueryDataResponse'>), 1: (<class 'pymodbus.pdu.diag_message.RestartCommunicationsOptionRequest'>, <class 'pymodbus.pdu.diag_message.RestartCommunicationsOptionResponse'>), 2: (<class 'pymodbus.pdu.diag_message.ReturnDiagnosticRegisterRequest'>, <class 'pymodbus.pdu.diag_message.ReturnDiagnosticRegisterResponse'>), 3: (<class 'pymodbus.pdu.diag_message.ChangeAsciiInputDelimiterRequest'>, <class 'pymodbus.pdu.diag_message.ChangeAsciiInputDelimiterResponse'>), 4: (<class 'pymodbus.pdu.diag_message.ForceListenOnlyModeRequest'>, <class 'pymodbus.pdu.diag_message.ForceListenOnlyModeResponse'>), 10: (<class 'pymodbus.pdu.diag_message.ClearCountersRequest'>, <class 'pymodbus.pdu.diag_message.ClearCountersResponse'>), 11: (<class 'pymodbus.pdu.diag_message.ReturnBusMessageCountRequest'>, <class 'pymodbus.pdu.diag_message.ReturnBusMessageCountResponse'>), 12: (<class 'pymodbus.pdu.diag_message.ReturnBusCommunicationErrorCountRequest'>, <class 'pymodbus.pdu.diag_message.ReturnBusCommunicationErrorCountResponse'>), 13: (<class 'pymodbus.pdu.diag_message.ReturnBusExceptionErrorCountRequest'>, <class 'pymodbus.pdu.diag_message.ReturnBusExceptionErrorCountResponse'>), 14: (<class 'pymodbus.pdu.diag_message.ReturnDeviceMessageCountRequest'>, <class 'pymodbus.pdu.diag_message.ReturnDeviceMessageCountResponse'>), 15: (<class 'pymodbus.pdu.diag_message.ReturnDeviceNoResponseCountRequest'>, <class 'pymodbus.pdu.diag_message.ReturnDeviceNoResponseCountResponse'>), 16: (<class 'pymodbus.pdu.diag_message.ReturnDeviceNAKCountRequest'>, <class 'pymodbus.pdu.diag_message.ReturnDeviceNAKCountResponse'>), 17: (<class 'pymodbus.pdu.diag_message.ReturnDeviceBusyCountRequest'>, <class 'pymodbus.pdu.diag_message.ReturnDeviceBusyCountResponse'>), 18: (<class 'pymodbus.pdu.diag_message.ReturnDeviceBusCharacterOverrunCountRequest'>, <class 'pymodbus.pdu.diag_message.ReturnDeviceBusCharacterOverrunCountResponse'>), 19: (<class 'pymodbus.pdu.diag_message.ReturnIopOverrunCountRequest'>, <class 'pymodbus.pdu.diag_message.ReturnIopOverrunCountResponse'>), 20: (<class 'pymodbus.pdu.diag_message.ClearOverrunCountRequest'>, <class 'pymodbus.pdu.diag_message.ClearOverrunCountResponse'>), 21: (<class 'pymodbus.pdu.diag_message.GetClearModbusPlusRequest'>, <class 'pymodbus.pdu.diag_message.GetClearModbusPlusResponse'>)}, 43: {14: (<class 'pymodbus.pdu.mei_message.ReadDeviceInformationRequest'>, <class 'pymodbus.pdu.mei_message.ReadDeviceInformationResponse'>)}}
pdu_table: dict[int, tuple[type[ModbusPDU], type[ModbusPDU]]] = {1: (<class 'pymodbus.pdu.bit_message.ReadCoilsRequest'>, <class 'pymodbus.pdu.bit_message.ReadCoilsResponse'>), 2: (<class 'pymodbus.pdu.bit_message.ReadDiscreteInputsRequest'>, <class 'pymodbus.pdu.bit_message.ReadDiscreteInputsResponse'>), 3: (<class 'pymodbus.pdu.register_message.ReadHoldingRegistersRequest'>, <class 'pymodbus.pdu.register_message.ReadHoldingRegistersResponse'>), 4: (<class 'pymodbus.pdu.register_message.ReadInputRegistersRequest'>, <class 'pymodbus.pdu.register_message.ReadInputRegistersResponse'>), 5: (<class 'pymodbus.pdu.bit_message.WriteSingleCoilRequest'>, <class 'pymodbus.pdu.bit_message.WriteSingleCoilResponse'>), 6: (<class 'pymodbus.pdu.register_message.WriteSingleRegisterRequest'>, <class 'pymodbus.pdu.register_message.WriteSingleRegisterResponse'>), 7: (<class 'pymodbus.pdu.other_message.ReadExceptionStatusRequest'>, <class 'pymodbus.pdu.other_message.ReadExceptionStatusResponse'>), 8: (<class 'pymodbus.pdu.diag_message.DiagnosticBase'>, <class 'pymodbus.pdu.diag_message.DiagnosticBase'>), 11: (<class 'pymodbus.pdu.other_message.GetCommEventCounterRequest'>, <class 'pymodbus.pdu.other_message.GetCommEventCounterResponse'>), 12: (<class 'pymodbus.pdu.other_message.GetCommEventLogRequest'>, <class 'pymodbus.pdu.other_message.GetCommEventLogResponse'>), 15: (<class 'pymodbus.pdu.bit_message.WriteMultipleCoilsRequest'>, <class 'pymodbus.pdu.bit_message.WriteMultipleCoilsResponse'>), 16: (<class 'pymodbus.pdu.register_message.WriteMultipleRegistersRequest'>, <class 'pymodbus.pdu.register_message.WriteMultipleRegistersResponse'>), 17: (<class 'pymodbus.pdu.other_message.ReportDeviceIdRequest'>, <class 'pymodbus.pdu.other_message.ReportDeviceIdResponse'>), 20: (<class 'pymodbus.pdu.file_message.ReadFileRecordRequest'>, <class 'pymodbus.pdu.file_message.ReadFileRecordResponse'>), 21: (<class 'pymodbus.pdu.file_message.WriteFileRecordRequest'>, <class 'pymodbus.pdu.file_message.WriteFileRecordResponse'>), 22: (<class 'pymodbus.pdu.register_message.MaskWriteRegisterRequest'>, <class 'pymodbus.pdu.register_message.MaskWriteRegisterResponse'>), 23: (<class 'pymodbus.pdu.register_message.ReadWriteMultipleRegistersRequest'>, <class 'pymodbus.pdu.register_message.ReadWriteMultipleRegistersResponse'>), 24: (<class 'pymodbus.pdu.file_message.ReadFifoQueueRequest'>, <class 'pymodbus.pdu.file_message.ReadFifoQueueResponse'>), 43: (<class 'pymodbus.pdu.mei_message.ReadDeviceInformationRequest'>, <class 'pymodbus.pdu.mei_message.ReadDeviceInformationResponse'>)}
register(custom_class: type[ModbusPDU]) None

Register a function and sub function class with the decoder.

Bit Reading Request/Response messages.

class pymodbus.pdu.bit_message.ReadCoilsRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

ReadCoilsRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Run request against a datastore.

decode(data: bytes) None

Decode a request pdu.

encode() bytes

Encode a request pdu.

function_code: int = 1
get_response_pdu_size() int

Get response pdu size.

Func_code (1 byte) + Byte Count(1 byte) + Quantity of Coils (n Bytes)/8, if the remainder is different of 0 then N = N+1

rtu_frame_size: int = 8
class pymodbus.pdu.bit_message.ReadCoilsResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

ReadCoilsResponse.

decode(data: bytes) None

Decode response pdu.

encode() bytes

Encode response pdu.

function_code: int = 1
rtu_byte_count_pos: int = 2
class pymodbus.pdu.bit_message.ReadDiscreteInputsRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ReadCoilsRequest

ReadDiscreteInputsRequest.

function_code: int = 2
class pymodbus.pdu.bit_message.ReadDiscreteInputsResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ReadCoilsResponse

ReadDiscreteInputsResponse.

function_code: int = 2
class pymodbus.pdu.bit_message.WriteMultipleCoilsRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

WriteMultipleCoilsRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Run a request against a datastore.

decode(data: bytes) None

Decode a write coils request.

encode() bytes

Encode write coils request.

function_code: int = 15
get_response_pdu_size() int

Get response pdu size.

Func_code (1 byte) + Output Address (2 byte) + Quantity of Outputs (2 Bytes) :return:

rtu_byte_count_pos: int = 6
class pymodbus.pdu.bit_message.WriteMultipleCoilsResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

WriteMultipleCoilsResponse.

decode(data: bytes) None

Decode a write coils response.

encode() bytes

Encode write coils response.

function_code: int = 15
rtu_frame_size: int = 8
class pymodbus.pdu.bit_message.WriteSingleCoilRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: WriteSingleCoilResponse

WriteSingleCoilRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Run a request against a datastore.

get_response_pdu_size() int

Get response pdu size.

Func_code (1 byte) + Output Address (2 byte) + Output Value (2 Bytes)

class pymodbus.pdu.bit_message.WriteSingleCoilResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

WriteSingleCoilResponse.

decode(data: bytes) None

Decode a write coil request.

encode() bytes

Encode write coil request.

function_code: int = 5
rtu_frame_size: int = 8

Diagnostic Record Read/Write.

class pymodbus.pdu.diag_message.ChangeAsciiInputDelimiterRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ChangeAsciiInputDelimiterRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

sub_function_code: int = 3
class pymodbus.pdu.diag_message.ChangeAsciiInputDelimiterResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ChangeAsciiInputDelimiterResponse.

sub_function_code: int = 3
class pymodbus.pdu.diag_message.ClearCountersRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ClearCountersRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

sub_function_code: int = 10
class pymodbus.pdu.diag_message.ClearCountersResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ClearCountersResponse.

sub_function_code: int = 10
class pymodbus.pdu.diag_message.ClearOverrunCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ClearOverrunCountRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

sub_function_code: int = 20
class pymodbus.pdu.diag_message.ClearOverrunCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ClearOverrunCountResponse.

sub_function_code: int = 20
class pymodbus.pdu.diag_message.DiagnosticBase(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

DiagnosticBase.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Implement dummy.

decode(data: bytes) None

Decode a diagnostic request.

classmethod decode_sub_function_code(data: bytes) int

Decode sub function code (2 bytes).

encode() bytes

Encode a diagnostic response.

function_code: int = 8
get_response_pdu_size() int

Get response pdu size.

Func_code (1 byte) + Sub function code (2 byte) + Data (2 * N bytes)

message: bytes | int | list | tuple | None
rtu_frame_size: int = 8
sub_function_code: int = 9999
class pymodbus.pdu.diag_message.ForceListenOnlyModeRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ForceListenOnlyModeRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

sub_function_code: int = 4
class pymodbus.pdu.diag_message.ForceListenOnlyModeResponse(dev_id=1, transaction_id=0)

Bases: DiagnosticBase

ForceListenOnlyModeResponse.

This does not send a response

sub_function_code: int = 4
class pymodbus.pdu.diag_message.GetClearModbusPlusRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

GetClearModbusPlusRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

encode()

Encode a diagnostic response.

get_response_pdu_size()

Return size of the respaonse.

Func_code (1 byte) + Sub function code (2 byte) + Operation (2 byte) + Data (108 bytes)

sub_function_code: int = 21
class pymodbus.pdu.diag_message.GetClearModbusPlusResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

GetClearModbusPlusResponse.

sub_function_code: int = 21
class pymodbus.pdu.diag_message.RestartCommunicationsOptionRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

RestartCommunicationsOptionRequest.

sub_function_code: int = 1
class pymodbus.pdu.diag_message.RestartCommunicationsOptionResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

RestartCommunicationsOptionResponse.

sub_function_code: int = 1
class pymodbus.pdu.diag_message.ReturnBusCommunicationErrorCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnBusCommunicationErrorCountRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

sub_function_code: int = 12
class pymodbus.pdu.diag_message.ReturnBusCommunicationErrorCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnBusCommunicationErrorCountResponse.

sub_function_code: int = 12
class pymodbus.pdu.diag_message.ReturnBusExceptionErrorCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnBusExceptionErrorCountRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

sub_function_code: int = 13
class pymodbus.pdu.diag_message.ReturnBusExceptionErrorCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnBusExceptionErrorCountResponse.

sub_function_code: int = 13
class pymodbus.pdu.diag_message.ReturnBusMessageCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnBusMessageCountRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

sub_function_code: int = 11
class pymodbus.pdu.diag_message.ReturnBusMessageCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnBusMessageCountResponse.

sub_function_code: int = 11
class pymodbus.pdu.diag_message.ReturnDeviceBusCharacterOverrunCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnDeviceBusCharacterOverrunCountRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

sub_function_code: int = 18
class pymodbus.pdu.diag_message.ReturnDeviceBusCharacterOverrunCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnDeviceBusCharacterOverrunCountResponse.

sub_function_code: int = 18
class pymodbus.pdu.diag_message.ReturnDeviceBusyCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnDeviceBusyCountRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

sub_function_code: int = 17
class pymodbus.pdu.diag_message.ReturnDeviceBusyCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnDeviceBusyCountResponse.

sub_function_code: int = 17
class pymodbus.pdu.diag_message.ReturnDeviceMessageCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnDeviceMessageCountRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

sub_function_code: int = 14
class pymodbus.pdu.diag_message.ReturnDeviceMessageCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnDeviceMessageCountResponse.

sub_function_code: int = 14
class pymodbus.pdu.diag_message.ReturnDeviceNAKCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnDeviceNAKCountRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

sub_function_code: int = 16
class pymodbus.pdu.diag_message.ReturnDeviceNAKCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnDeviceNAKCountResponse.

sub_function_code: int = 16
class pymodbus.pdu.diag_message.ReturnDeviceNoResponseCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnDeviceNoResponseCountRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

sub_function_code: int = 15
class pymodbus.pdu.diag_message.ReturnDeviceNoResponseCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnDeviceNoResponseCountResponse.

sub_function_code: int = 15
class pymodbus.pdu.diag_message.ReturnDiagnosticRegisterRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnDiagnosticRegisterRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

sub_function_code: int = 2
class pymodbus.pdu.diag_message.ReturnDiagnosticRegisterResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnDiagnosticRegisterResponse.

sub_function_code: int = 2
class pymodbus.pdu.diag_message.ReturnIopOverrunCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnIopOverrunCountRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

sub_function_code: int = 19
class pymodbus.pdu.diag_message.ReturnIopOverrunCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnIopOverrunCountResponse.

sub_function_code: int = 19
class pymodbus.pdu.diag_message.ReturnQueryDataRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnQueryDataRequest.

sub_function_code: int = 0
class pymodbus.pdu.diag_message.ReturnQueryDataResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnQueryDataResponse.

sub_function_code: int = 0

File Record Read/Write Messages.

class pymodbus.pdu.file_message.FileRecord(file_number: int = 0, record_number: int = 0, record_data: bytes = b'', record_length: int = 0)

Bases: object

Represents a file record and its relevant data.

file_number: int = 0
record_data: bytes = b''
record_length: int = 0
record_number: int = 0
class pymodbus.pdu.file_message.ReadFifoQueueRequest(address: int = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

ReadFifoQueueRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

decode(data: bytes) None

Decode the incoming request.

encode() bytes

Encode the request packet.

function_code: int = 24
rtu_frame_size: int = 6
class pymodbus.pdu.file_message.ReadFifoQueueResponse(values: list[int] | None = None, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

ReadFifoQueueResponse.

classmethod calculateRtuFrameSize(data: bytes) int

Calculate the size of the message.

decode(data: bytes) None

Decode a the response.

encode() bytes

Encode the response.

function_code: int = 24
class pymodbus.pdu.file_message.ReadFileRecordRequest(records: list[FileRecord] | None = None, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

ReadFileRecordRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

decode(data: bytes) None

Decode the incoming request.

encode() bytes

Encode the request packet.

function_code: int = 20
get_response_pdu_size() int

Get response pdu size.

Func_code (1 byte) + Quantity of record (each 7 bytes),

records: list[FileRecord]
rtu_byte_count_pos: int = 2
class pymodbus.pdu.file_message.ReadFileRecordResponse(records: list[FileRecord] | None = None, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

ReadFileRecordResponse.

decode(data: bytes) None

Decode the response.

encode() bytes

Encode the response.

function_code: int = 20
records: list[FileRecord]
rtu_byte_count_pos: int = 2
class pymodbus.pdu.file_message.WriteFileRecordRequest(records: list[FileRecord] | None = None, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

WriteFileRecordRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

decode(data: bytes) None

Decode the incoming request.

encode() bytes

Encode the request packet.

function_code: int = 21
get_response_pdu_size() int

Get response pdu size.

Func_code (1 byte) + Quantity of record (each 7 bytes),

records: list[FileRecord]
rtu_byte_count_pos: int = 2
class pymodbus.pdu.file_message.WriteFileRecordResponse(records: list[FileRecord] | None = None, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

The normal response is an echo of the request.

decode(data: bytes) None

Decode the incoming request.

encode() bytes

Encode the response.

function_code: int = 21
records: list[FileRecord]
rtu_byte_count_pos: int = 2

Encapsulated Interface (MEI) Transport Messages.

class pymodbus.pdu.mei_message.ReadDeviceInformationRequest(read_code: int | None = None, object_id: int = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

ReadDeviceInformationRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

decode(data: bytes) None

Decode data part of the message.

classmethod decode_sub_function_code(data: bytes) int

Decode sub function code (1 byte).

encode() bytes

Encode the request packet.

function_code: int = 43
rtu_frame_size: int = 7
sub_function_code: int = 14
class pymodbus.pdu.mei_message.ReadDeviceInformationResponse(read_code: int | None = None, information: dict[int, Any] | None = None, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

ReadDeviceInformationResponse.

classmethod calculateRtuFrameSize(data: bytes) int

Calculate the size of the message.

decode(data: bytes) None

Decode a the response.

classmethod decode_sub_function_code(data: bytes) int

Decode sub function code (1 byte).

encode() bytes

Encode the response.

function_code: int = 43
information: dict[int, Any]
sub_function_code: int = 14

Diagnostic record read/write.

class pymodbus.pdu.other_message.GetCommEventCounterRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

GetCommEventCounterRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

decode(data: bytes) None

Decode data part of the message.

encode() bytes

Encode the message.

function_code: int = 11
rtu_frame_size: int = 4
class pymodbus.pdu.other_message.GetCommEventCounterResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

GetCommEventCounterRequest.

decode(data: bytes) None

Decode a the response.

encode() bytes

Encode the response.

function_code: int = 11
rtu_frame_size: int = 8
class pymodbus.pdu.other_message.GetCommEventLogRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

GetCommEventLogRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

decode(data: bytes) None

Decode data part of the message.

encode() bytes

Encode the message.

function_code: int = 12
rtu_frame_size: int = 4
class pymodbus.pdu.other_message.GetCommEventLogResponse(status: bool = True, message_count: int = 0, event_count: int = 0, events: list[int] | None = None, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

GetCommEventLogRequest.

decode(data: bytes) None

Decode a the response.

encode() bytes

Encode the response.

function_code: int = 12
rtu_byte_count_pos: int = 2
class pymodbus.pdu.other_message.ReadExceptionStatusRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

ReadExceptionStatusRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

decode(data: bytes) None

Decode data part of the message.

encode() bytes

Encode the message.

function_code: int = 7
rtu_frame_size: int = 4
class pymodbus.pdu.other_message.ReadExceptionStatusResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

ReadExceptionStatusResponse.

decode(data: bytes) None

Decode a the response.

encode() bytes

Encode the response.

function_code: int = 7
rtu_frame_size: int = 5
class pymodbus.pdu.other_message.ReportDeviceIdRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

ReportDeviceIdRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

decode(data: bytes) None

Decode data part of the message.

encode() bytes

Encode the message.

function_code: int = 17
rtu_frame_size: int = 4
class pymodbus.pdu.other_message.ReportDeviceIdResponse(identifier: bytes = b'\x00', status: bool = True, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

ReportDeviceIdRequeste.

decode(data: bytes) None

Decode a the response.

Since the identifier is device dependent, we just return the raw value that a user can decode to whatever it should be.

encode() bytes

Encode the response.

function_code: int = 17
rtu_byte_count_pos: int = 2

Contains base classes for modbus request/response/error packets.

class pymodbus.pdu.pdu.ModbusPDU(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: object

Base class for all Modbus messages.

address: int
bits: list[bool]
classmethod calculateRtuFrameSize(data: bytes) int

Calculate the size of a PDU.

count: int
async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

decode(data: bytes) None

Decode data part of the message.

classmethod decode_sub_function_code(data: bytes) int

Decode sub function code.

dev_id: int
encode() bytes

Encode the message.

exception_code: int
function_code: int = 0
fut: Future
get_response_pdu_size() int

Calculate response pdu size.

isError() bool

Check if the error is a success or failure.

registers: list[int]
retries: int
rtu_byte_count_pos: int = 0
rtu_frame_size: int = 0
status: int
sub_function_code: int = -1
transaction_id: int
verifyAddress(address: int = -1) None

Validate API supplied address.

verifyCount(max_count: int, count: int = -1) None

Validate API supplied count.

Register Reading Request/Response.

class pymodbus.pdu.register_message.MaskWriteRegisterRequest(address=0, and_mask=65535, or_mask=0, dev_id=1, transaction_id=0)

Bases: ModbusPDU

MaskWriteRegisterRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

decode(data: bytes) None

Decode the incoming request.

encode() bytes

Encode the request packet.

function_code: int = 22
rtu_frame_size: int = 10
class pymodbus.pdu.register_message.MaskWriteRegisterResponse(address=0, and_mask=65535, or_mask=0, dev_id=1, transaction_id=0)

Bases: ModbusPDU

MaskWriteRegisterResponse.

decode(data: bytes) None

Decode a the response.

encode() bytes

Encode the response.

function_code: int = 22
rtu_frame_size: int = 10
class pymodbus.pdu.register_message.ReadHoldingRegistersRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

ReadHoldingRegistersRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

decode(data: bytes) None

Decode a register request packet.

encode() bytes

Encode the request packet.

function_code: int = 3
get_response_pdu_size() int

Get response pdu size.

Func_code (1 byte) + Byte Count(1 byte) + 2 * Quantity of registers (== byte count).

rtu_frame_size: int = 8
class pymodbus.pdu.register_message.ReadHoldingRegistersResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

ReadHoldingRegistersResponse.

decode(data: bytes) None

Decode a register response packet.

encode() bytes

Encode the response packet.

function_code: int = 3
rtu_byte_count_pos: int = 2
class pymodbus.pdu.register_message.ReadInputRegistersRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ReadHoldingRegistersRequest

ReadInputRegistersRequest.

function_code: int = 4
class pymodbus.pdu.register_message.ReadInputRegistersResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ReadHoldingRegistersResponse

ReadInputRegistersResponse.

function_code: int = 4
class pymodbus.pdu.register_message.ReadWriteMultipleRegistersRequest(read_address: int = 0, read_count: int = 0, write_address: int = 0, write_registers: list[int] | None = None, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

ReadWriteMultipleRegistersRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

decode(data: bytes) None

Decode the register request packet.

encode() bytes

Encode the request packet.

function_code: int = 23
get_response_pdu_size() int

Get response pdu size.

Func_code (1 byte) + Byte Count(1 byte) + 2 * Quantity of Coils (n Bytes)

rtu_byte_count_pos: int = 10
class pymodbus.pdu.register_message.ReadWriteMultipleRegistersResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ReadHoldingRegistersResponse

ReadWriteMultipleRegistersResponse.

function_code: int = 23
class pymodbus.pdu.register_message.WriteMultipleRegistersRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

WriteMultipleRegistersRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

decode(data: bytes) None

Decode a write single register packet packet request.

encode() bytes

Encode a write single register packet packet request.

function_code: int = 16
get_response_pdu_size() int

Get response pdu size.

Func_code (1 byte) + Starting Address (2 byte) + Quantity of Registers (2 Bytes)

rtu_byte_count_pos: int = 6
class pymodbus.pdu.register_message.WriteMultipleRegistersResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

WriteMultipleRegistersResponse.

decode(data: bytes) None

Decode a write single register packet packet request.

encode() bytes

Encode a write single register packet packet request.

function_code: int = 16
rtu_frame_size: int = 8
class pymodbus.pdu.register_message.WriteSingleRegisterRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: WriteSingleRegisterResponse

WriteSingleRegisterRequest.

async datastore_update(context: ModbusServerContext, device_id: int) ModbusPDU

Update diagnostic request on the given device.

get_response_pdu_size() int

Get response pdu size.

Func_code (1 byte) + Register Address(2 byte) + Register Value (2 bytes)

class pymodbus.pdu.register_message.WriteSingleRegisterResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

WriteSingleRegisterResponse.

decode(data: bytes) None

Decode a write single register packet packet request.

encode() bytes

Encode a write single register packet packet request.

function_code: int = 6
rtu_frame_size: int = 8

Modbus Device Controller.

These are the device management handlers. They should be maintained in the server context and the various methods should be inserted in the correct locations.

class pymodbus.pdu.device.DeviceInformationFactory

Bases: object

This is a helper.

That really just hides some of the complexity of processing the device information requests (function code 0x2b 0x0e).

classmethod get(control, read_code=DeviceInformation.BASIC, object_id=0)

Get the requested device data from the system.

Parameters:
  • control – The control block to pull data from

  • read_code – The read code to process

  • object_id – The specific object_id to read

Returns:

The requested data (id, length, value)

class pymodbus.pdu.device.ModbusDeviceIdentification(info=None, info_name=None)

Bases: object

This is used to supply the device identification.

For the readDeviceIdentification function

For more information read section 6.21 of the modbus application protocol.

property MajorMinorRevision
property ModelName
property ProductCode
property ProductName
property UserApplicationName
property VendorName
property VendorUrl
stat_data = {0: '', 1: '', 2: '', 3: '', 4: '', 5: '', 6: '', 7: '', 8: ''}
summary()

Return a summary of the main items.

Returns:

An dictionary of the main items

update(value)

Update the values of this identity.

using another identify as the value

Parameters:

value – The value to copy values from

class pymodbus.pdu.device.ModbusPlusStatistics

Bases: object

This is used to maintain the current modbus plus statistics count.

As of right now this is simply a stub to complete the modbus implementation. For more information, see the modbus implementation guide page 87.

encode() list[int]

Return a summary of the modbus plus statistics.

Returns:

An iterator over lists of 8-bit integers representing each statistic

reset() None

Clear all of the modbus plus statistics.

stat_data: dict[str, list[int]] = {'__unused_10_lowbit': [0], 'active_station_bit_map': [0, 0, 0, 0, 0, 0, 0, 0], 'bad_link_address_error': [0], 'bad_mac_function_code_error': [0], 'bad_packet_length_error': [0], 'communication_failed_error': [0], 'communication_retries': [0], 'data_id_command_transfer': [0], 'data_id_input_path': [0, 0, 0, 0, 0, 0, 0, 0], 'data_id_token_owner': [0], 'data_master_output_path': [0, 0, 0, 0, 0, 0, 0, 0], 'data_master_token_failed': [0], 'data_master_token_owner': [0], 'exception_response_error': [0], 'forgotten_transaction_error': [0], 'frame_size_error': [0], 'global_data_bit_map': [0, 0, 0, 0, 0, 0, 0, 0], 'good_receive_packet': [0], 'internal_packet_length_error': [0], 'mac_state_variable': [0, 0], 'network_address': [0, 0], 'no_response_error': [0], 'node_type_id': [0, 0], 'peer_status_code': [0, 0], 'pretransmit_deferral_error': [0], 'program_id_auto_logout': [0], 'program_id_command_transfer': [0], 'program_id_input_path': [0, 0, 0, 0, 0, 0, 0, 0], 'program_id_token_owner': [0], 'program_master_connect_status': [0], 'program_master_outptu_path': [0, 0, 0, 0, 0, 0, 0, 0], 'program_master_rsp_transfer': [0], 'program_master_token_failed': [0], 'program_master_token_owner': [0], 'receive_buffer_dma_overrun': [0], 'receive_buffer_use_bit_map': [0, 0, 0, 0, 0, 0, 0, 0], 'receiver_alignment_error': [0], 'receiver_collision_abort_error': [0], 'receiver_crc_error': [0], 'repeated_command_received': [0], 'software_version_number': [0, 0], 'token_pass_counter': [0, 0], 'token_rotation_time': [0, 0], 'token_station_bit_map': [0, 0, 0, 0, 0, 0, 0, 0], 'transmit_buffer_dma_underrun': [0], 'unexpected_path_error': [0], 'unexpected_response_error': [0]}
summary()

Return a summary of the modbus plus statistics.

Returns:

54 16-bit words representing the status

Modbus Remote Events.

An event byte returned by the Get Communications Event Log function can be any one of four types. The type is defined by bit 7 (the high-order bit) in each byte. It may be further defined by bit 6.

class pymodbus.pdu.events.CommunicationRestartEvent

Bases: ModbusEvent

Restart remote device Initiated Communication.

The remote device stores this type of event byte when its communications port is restarted. The remote device can be restarted by the Diagnostics function (code 08), with sub-function Restart Communications Option (code 00 01).

That function also places the remote device into a “Continue on Error” or “Stop on Error” mode. If the remote device is placed into “Continue on Error” mode, the event byte is added to the existing event log. If the remote device is placed into “Stop on Error” mode, the byte is added to the log and the rest of the log is cleared to zeros.

The event is defined by a content of zero.

decode(event)

Decode the event message to its status bits.

Parameters:

event – The event to decode

Raises:

ParameterException

encode()

Encode the status bits to an event message.

Returns:

The encoded event message

value = 0
class pymodbus.pdu.events.EnteredListenModeEvent

Bases: ModbusEvent

Enter Remote device Listen Only Mode.

The remote device stores this type of event byte when it enters the Listen Only Mode. The event is defined by a content of 04 hex.

decode(event)

Decode the event message to its status bits.

Parameters:

event – The event to decode

Raises:

ParameterException

encode()

Encode the status bits to an event message.

Returns:

The encoded event message

value = 4
class pymodbus.pdu.events.ModbusEvent

Bases: ABC

Define modbus events.

abstractmethod decode(event)

Decode the event message to its status bits.

Parameters:

event – The event to decode

abstractmethod encode() bytes

Encode the status bits to an event message.

class pymodbus.pdu.events.RemoteReceiveEvent(overrun=False, listen=False, broadcast=False)

Bases: ModbusEvent

Remote device MODBUS Receive Event.

The remote device stores this type of event byte when a query message is received. It is stored before the remote device processes the message. This event is defined by bit 7 set to logic “1”. The other bits will be set to a logic “1” if the corresponding condition is TRUE. The bit layout is:

Bit Contents
----------------------------------
0   Not Used
2   Not Used
3   Not Used
4   Character Overrun
5   Currently in Listen Only Mode
6   Broadcast Receive
7   1
decode(event: bytes) None

Decode the event message to its status bits.

Parameters:

event – The event to decode

encode() bytes

Encode the status bits to an event message.

Returns:

The encoded event message

class pymodbus.pdu.events.RemoteSendEvent(read=False, device_abort=False, device_busy=False, device_nak=False, write_timeout=False, listen=False)

Bases: ModbusEvent

Remote device MODBUS Send Event.

The remote device stores this type of event byte when it finishes processing a request message. It is stored if the remote device returned a normal or exception response, or no response.

This event is defined by bit 7 set to a logic “0”, with bit 6 set to a “1”. The other bits will be set to a logic “1” if the corresponding condition is TRUE. The bit layout is:

Bit Contents
-----------------------------------------------------------
0   Read Exception Sent (Exception Codes 1-3)
1   Device Abort Exception Sent (Exception Code 4)
2   Device Busy Exception Sent (Exception Codes 5-6)
3   Device Program NAK Exception Sent (Exception Code 7)
4   Write Timeout Error Occurred
5   Currently in Listen Only Mode
6   1
7   0
decode(event)

Decode the event message to its status bits.

Parameters:

event – The event to decode

encode()

Encode the status bits to an event message.

Returns:

The encoded event message