Gabriele Privitera
Gabriele Privitera

Reputation: 101

Creating a modbus tcp/ip server simulator in python using pymodbus

as the title suggests I'm trying to develop a modbus tcp / ip simulator to emulate the interactions between multiple devices and a plc. The plc modbus comm is fine so I won't talk about how I did it. Now, regarding the python part, I'm new to kind of applications so I did some research, also with the help of AI, regarding the functionalities of the pymodbus library. Basically, my aim is to read from a mysql server some device configurations (this part works fine so I won't insert it in the code) and to instatiate as many servers as the records read. These servers should then respond to the requests of a plc. Here's my code:

class modbus_server:
     list = []
     def __init__(self, id, name, device_type_id, enable, deleted, modbus_id, device_name, modbus_enable, protocol, host, port, unit_id, byte_order, word_order, polling_period, 
                  modbus_deleted, model, manufacturer, firmware_rev, device_info, type_deleted, devicetype_datastruct_id, variable_name, devicetype_datastruct_description, modbus_function, modbus_address, modbus_data_type, modbus_access, measure_unit, gain, diff, enum_description, 
                  value, enum_type, priority, bitmask_description, position, on_value):
          self.id = id
          self.name = name
          self.device_type_id = device_type_id
          self.enable = enable
          self.deleted = deleted
          self.modbus_id = modbus_id
          self.device_name = device_name
          self.modbus_enable = modbus_enable
          self.protocol = protocol
          self.host = host
          self.port = port
          self.unit_id = unit_id
          self.byte_order = byte_order
          self.word_order = word_order
          self.polling_period = polling_period
          self.modbus_deleted = modbus_deleted
          self.model = model
          self.manufacturer = manufacturer
          self.firmware_rev = firmware_rev
          self.device_info = device_info
          self.type_deleted = type_deleted
          self.devicetype_datastruct_id = devicetype_datastruct_id
          self.variable_name = variable_name
          self.devicetype_datastruct_description = devicetype_datastruct_description
          self.modbus_function = modbus_function
          self.modbus_address = modbus_address
          self.modbus_data_type = modbus_data_type
          self.modbus_access = modbus_access
          self.measure_unit = measure_unit
          self.gain = gain
          self.diff = diff
          self.enum_description = enum_description
          self.value = value
          self.enum_type = enum_type
          self.priority = priority
          self.bitmask_description = bitmask_description
          self.position = position
          self.on_value = on_value

     
     def run_server(self):
          self.memory = ModbusSlaveContext(
               di = ModbusSequentialDataBlock(0, [0]*100),
               co = ModbusSequentialDataBlock(0, [0]*100),
               hr = ModbusSequentialDataBlock(0, [0]*100),
               ir = ModbusSequentialDataBlock(0, [0]*100)
          )

          self.context = ModbusServerContext(slaves = self.memory, single = True)
          loop = asyncio.get_event_loop()
          self.start_server = StartTcpServer(self.context, address = self.host, loop = loop)
          self.server_update_values()
          logging.info(f"Server Modbus avviato su {self.host}:{self.port}")
          loop.run_until_complete()

     def server_update_values(self):
          super().setValues(self.modbus_function, self.modbus_address, self.value)
          function_code_map = {1: 'Coils', 2: 'Discrete Inputs', 3: 'Holding Registers', 4: 'Input Registers'}
          logging.info(f"Ricevuta richiesta di scrittura : funzione {function_code_map.get(self.modbus_function)}")     
def main():
    mydb = mysql.connector.connect(
        host = "localhost",
        user = "user",
        password = "mypswrd",
        database="mydb"
    )

    if mydb.is_connected():
        #device.configure_device(mydb)
        #device_modbus.configure_device_modbus(mydb)
        #enumeration.configure_device_enumeration(mydb)
        #bitmask.configure_device_bitmask(mydb)
        #devicetype_data_structure.configure_devicetype_data_structure_modbus(mydb)
        #device_type.configure_device_type(mydb)
        #setting.configure_device_setting(mydb)
        modbus_server.inner_join_tables(mydb)
        logging.basicConfig(format = '%(asctime)s %(message)s', level = logging.INFO)
        for server in modbus_server.list:
             server.run_server()

    else:
         print("Connection refused")

    print(modbus_server.list)


main()

Do you think it's the best solution for my objective? Any constructive advice would ber great. Also I'm not sure if the way the sderver starts is correct, because I don't think that I'm specifying the port on which the server should start.

Edit: I had troubles starting the tcp server so instead of self.start_server = StartTcpServer(self.context, address = self.host, loop = loop) I wrote StartTcpServer(self.context, address = (self.host, self.port), loop = loop) but still doesn't work as I get the following error: TypeError: StartTcpServer() takes 0 positional arguments but 1 was given

Upvotes: 0

Views: 246

Answers (0)

Related Questions