Reputation: 101
as the title suggests I'm trying to develop a modbus tcp / ip simulator to emulate the interactions between multiple devices and a plc. The plc modbus comm is fine so I won't talk about how I did it. Now, regarding the python part, I'm new to kind of applications so I did some research, also with the help of AI, regarding the functionalities of the pymodbus library. Basically, my aim is to read from a mysql server some device configurations (this part works fine so I won't insert it in the code) and to instatiate as many servers as the records read. These servers should then respond to the requests of a plc. Here's my code:
class modbus_server:
list = []
def __init__(self, id, name, device_type_id, enable, deleted, modbus_id, device_name, modbus_enable, protocol, host, port, unit_id, byte_order, word_order, polling_period,
modbus_deleted, model, manufacturer, firmware_rev, device_info, type_deleted, devicetype_datastruct_id, variable_name, devicetype_datastruct_description, modbus_function, modbus_address, modbus_data_type, modbus_access, measure_unit, gain, diff, enum_description,
value, enum_type, priority, bitmask_description, position, on_value):
self.id = id
self.name = name
self.device_type_id = device_type_id
self.enable = enable
self.deleted = deleted
self.modbus_id = modbus_id
self.device_name = device_name
self.modbus_enable = modbus_enable
self.protocol = protocol
self.host = host
self.port = port
self.unit_id = unit_id
self.byte_order = byte_order
self.word_order = word_order
self.polling_period = polling_period
self.modbus_deleted = modbus_deleted
self.model = model
self.manufacturer = manufacturer
self.firmware_rev = firmware_rev
self.device_info = device_info
self.type_deleted = type_deleted
self.devicetype_datastruct_id = devicetype_datastruct_id
self.variable_name = variable_name
self.devicetype_datastruct_description = devicetype_datastruct_description
self.modbus_function = modbus_function
self.modbus_address = modbus_address
self.modbus_data_type = modbus_data_type
self.modbus_access = modbus_access
self.measure_unit = measure_unit
self.gain = gain
self.diff = diff
self.enum_description = enum_description
self.value = value
self.enum_type = enum_type
self.priority = priority
self.bitmask_description = bitmask_description
self.position = position
self.on_value = on_value
def run_server(self):
self.memory = ModbusSlaveContext(
di = ModbusSequentialDataBlock(0, [0]*100),
co = ModbusSequentialDataBlock(0, [0]*100),
hr = ModbusSequentialDataBlock(0, [0]*100),
ir = ModbusSequentialDataBlock(0, [0]*100)
)
self.context = ModbusServerContext(slaves = self.memory, single = True)
loop = asyncio.get_event_loop()
self.start_server = StartTcpServer(self.context, address = self.host, loop = loop)
self.server_update_values()
logging.info(f"Server Modbus avviato su {self.host}:{self.port}")
loop.run_until_complete()
def server_update_values(self):
super().setValues(self.modbus_function, self.modbus_address, self.value)
function_code_map = {1: 'Coils', 2: 'Discrete Inputs', 3: 'Holding Registers', 4: 'Input Registers'}
logging.info(f"Ricevuta richiesta di scrittura : funzione {function_code_map.get(self.modbus_function)}")
def main():
mydb = mysql.connector.connect(
host = "localhost",
user = "user",
password = "mypswrd",
database="mydb"
)
if mydb.is_connected():
#device.configure_device(mydb)
#device_modbus.configure_device_modbus(mydb)
#enumeration.configure_device_enumeration(mydb)
#bitmask.configure_device_bitmask(mydb)
#devicetype_data_structure.configure_devicetype_data_structure_modbus(mydb)
#device_type.configure_device_type(mydb)
#setting.configure_device_setting(mydb)
modbus_server.inner_join_tables(mydb)
logging.basicConfig(format = '%(asctime)s %(message)s', level = logging.INFO)
for server in modbus_server.list:
server.run_server()
else:
print("Connection refused")
print(modbus_server.list)
main()
Do you think it's the best solution for my objective? Any constructive advice would ber great. Also I'm not sure if the way the sderver starts is correct, because I don't think that I'm specifying the port on which the server should start.
Edit: I had troubles starting the tcp server so instead of self.start_server = StartTcpServer(self.context, address = self.host, loop = loop) I wrote StartTcpServer(self.context, address = (self.host, self.port), loop = loop)
but still doesn't work as I get the following error: TypeError: StartTcpServer() takes 0 positional arguments but 1 was given
Upvotes: 0
Views: 246