Code : Tout sélectionner
import asyncio
import datetime
from tkinter import Tk, Label, Entry, Button, messagebox, StringVar, Frame
from pymodbus.client import AsyncModbusTcpClient
import re
class ModbusGUI:
def __init__(self, root):
self.root = root
self.root.title("Transmission Modbus")
self.root.protocol("WM_DELETE_WINDOW", self.quit_application)
# Variables
self.ip = StringVar(value="127.0.0.1")
self.port = StringVar(value="502")
self.slave_id = StringVar(value="1")
self.start_register = StringVar(value="0")
self.delay_ms = StringVar(value="1000")
self.running = False
# Frame principal pour centrer les champs de saisie
self.frame = Frame(root)
self.frame.pack(padx=10, pady=10, expand=True, fill="both")
# Widgets
Label(self.frame, text="Adresse IP :").grid(row=0, column=0, sticky="w", padx=5, pady=5)
Entry(self.frame, textvariable=self.ip, justify="center", width=15).grid(row=0, column=1, padx=5, pady=5)
Label(self.frame, text="Port :").grid(row=1, column=0, sticky="w", padx=5, pady=5)
Entry(self.frame, textvariable=self.port, justify="center", width=7).grid(row=1, column=1, padx=5, pady=5)
Label(self.frame, text="ID Esclave :").grid(row=2, column=0, sticky="w", padx=5, pady=5)
Entry(self.frame, textvariable=self.slave_id, justify="center", width=7).grid(row=2, column=1, padx=5, pady=5)
Label(self.frame, text="Registre de départ :").grid(row=3, column=0, sticky="w", padx=5, pady=5)
Entry(self.frame, textvariable=self.start_register, justify="center", width=8).grid(row=3, column=1, padx=5, pady=5)
Label(self.frame, text="Délai (ms) :").grid(row=4, column=0, sticky="w", padx=5, pady=5)
Entry(self.frame, textvariable=self.delay_ms, justify="center", width=7).grid(row=4, column=1, padx=5, pady=5)
self.output_label = Label(self.frame, text="", justify="left", anchor="w", wraplength=400)
self.output_label.grid(row=5, column=0, columnspan=2, sticky="w", padx=5, pady=5)
self.test_button = Button(self.frame, text="Test IP", command=self.test_ip)
self.test_button.grid(row=6, column=0, padx=5, pady=5)
self.start_button = Button(self.frame, text="Démarrer Transmission", command=self.start_transmission, state="disabled")
self.start_button.grid(row=6, column=1, padx=5, pady=5)
self.stop_button = Button(self.frame, text="Arrêter Transmission", command=self.stop_transmission, state="disabled")
self.stop_button.grid(row=7, column=0, columnspan=2, padx=5, pady=5)
self.loop = asyncio.get_event_loop()
async def test_ip_async(self, ip):
proc = await asyncio.create_subprocess_exec(
"ping", "-n", "1", ip,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
await proc.communicate()
return proc.returncode == 0
def test_ip(self):
ip = self.ip.get()
self.loop.create_task(self._test_ip_and_notify(ip))
async def _test_ip_and_notify(self, ip):
self.output_label.config(text=f"Test de connexion à {ip}...")
if await self.test_ip_async(ip):
self.output_label.config(text="Vous êtes connecté au serveur Modbus")
self.start_button.config(state="normal")
self.stop_button.config(state="normal")
else:
self.output_label.config(text=f"L'adresse {ip} n'est pas joignable.")
self.start_button.config(state="disabled")
self.stop_button.config(state="disabled")
async def update_registers_with_datetime(self, ip, port, slave_id, start_register, delay_ms):
client = AsyncModbusTcpClient(ip, port=int(port))
try:
if not await client.connect():
raise ConnectionError(f"Impossible de se connecter à {ip}:{port}")
while self.running:
now = datetime.datetime.now()
values = [
now.year,
now.month,
now.day,
now.hour,
now.minute,
now.second,
now.isocalendar()[1],
]
output = "Transmission en cours...\n"
for i, value in enumerate(values):
try:
write_result = await client.write_register(
start_register + i, value, slave=int(slave_id)
)
if write_result.isError():
output += f"Erreur au registre {start_register + i}.\n"
else:
output += f"Registre {start_register + i}: {value}\n"
except Exception as e:
output += f"Erreur lors de l'écriture au registre {start_register + i}: {e}\n"
self.output_label.config(text=output)
await asyncio.sleep(delay_ms / 1000)
except asyncio.CancelledError:
self.output_label.config(text="Transmission arrêtée.")
except ConnectionError as e:
self.output_label.config(text=str(e))
except Exception as e:
messagebox.showerror("Erreur", f"Erreur lors de la transmission : {e}")
finally:
if client:
await client.close()
def start_transmission(self):
try:
ip = self.ip.get()
port = self.port.get()
slave_id = self.slave_id.get()
start_register = int(self.start_register.get())
delay_ms = int(self.delay_ms.get())
# Vérification des entrées
if not self.validate_ip(ip):
messagebox.showerror("Erreur", "L'adresse IP est invalide.")
return
if not self.validate_port(port):
messagebox.showerror("Erreur", "Le port doit être un nombre entier valide.")
return
if not self.validate_slave_id(slave_id):
messagebox.showerror("Erreur", "L'ID de l'esclave doit être un nombre entier valide.")
return
if not self.validate_register(start_register):
messagebox.showerror("Erreur", "Le registre de départ doit être un nombre entier positif.")
return
if not self.validate_delay_ms(delay_ms):
messagebox.showerror("Erreur", "Le délai doit être un nombre entier entre 10ms et 10000ms.")
return
self.running = True
self.loop.create_task(
self.update_registers_with_datetime(ip, port, slave_id, start_register, delay_ms)
)
self.start_button.config(state="disabled")
self.stop_button.config(state="normal")
except ValueError:
messagebox.showerror("Erreur", "Veuillez entrer des valeurs valides pour les champs.")
def stop_transmission(self):
self.running = False
self.quit_application()
def quit_application(self):
self.running = False
self.root.quit()
self.root.destroy()
def validate_ip(self, ip):
# Vérifier que l'IP est au format correct (ex: 192.168.0.1)
regex = r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$"
return re.match(regex, ip) is not None
def validate_port(self, port):
try:
port_int = int(port)
return 0 <= port_int <= 65535
except ValueError:
return False
def validate_slave_id(self, slave_id):
try:
slave_id_int = int(slave_id)
return 0 <= slave_id_int <= 255 # ID d'esclave Modbus de 0 à 255
except ValueError:
return False
def validate_register(self, start_register):
try:
return int(start_register) >= 0
except ValueError:
return False
def validate_delay_ms(self, delay_ms):
try:
delay_int = int(delay_ms)
return 10 <= delay_int <= 10000 # délai entre 10ms et 10s
except ValueError:
return False
if __name__ == "__main__":
root = Tk()
app = ModbusGUI(root)
# Intégrer asyncio et tkinter
def run_asyncio_loop():
try:
app.loop.run_until_complete(asyncio.sleep(0.1))
except Exception:
pass
root.after(100, run_asyncio_loop)
# Initialiser asyncio avec Tkinter
root.after(100, run_asyncio_loop)
root.mainloop()