Code corrigé pour transmettre en BCD dans l'ordre exacte pour Schneider avec un compteur de requête en mode client :
CLIENT :
Code : Tout sélectionner
import asyncio
import datetime
from tkinter import Tk, Label, Entry, Button, messagebox, StringVar, Frame
from pymodbus.client import AsyncModbusTcpClient
import re
class ModbusGUI:
def __init__(self, root):
self.root = root
self.root.title("Transmission Modbus")
self.root.protocol("WM_DELETE_WINDOW", self.quit_application)
# Variables
self.ip = StringVar(value="127.0.0.1")
self.port = StringVar(value="502")
self.slave_id = StringVar(value="1")
self.start_register = StringVar(value="0")
self.delay_ms = StringVar(value="1000")
self.running = False
self.request_count = 0 # Compteur pour les requêtes envoyées
# Frame principal pour centrer les champs de saisie
self.frame = Frame(root)
self.frame.pack(padx=10, pady=10, expand=True, fill="both")
# Widgets
Label(self.frame, text="Adresse IP :").grid(row=0, column=0, sticky="w", padx=5, pady=5)
Entry(self.frame, textvariable=self.ip, justify="center", width=15).grid(row=0, column=1, padx=5, pady=5)
Label(self.frame, text="Port :").grid(row=1, column=0, sticky="w", padx=5, pady=5)
Entry(self.frame, textvariable=self.port, justify="center", width=7).grid(row=1, column=1, padx=5, pady=5)
Label(self.frame, text="ID Esclave :").grid(row=2, column=0, sticky="w", padx=5, pady=5)
Entry(self.frame, textvariable=self.slave_id, justify="center", width=7).grid(row=2, column=1, padx=5, pady=5)
Label(self.frame, text="Registre de départ :").grid(row=3, column=0, sticky="w", padx=5, pady=5)
Entry(self.frame, textvariable=self.start_register, justify="center", width=8).grid(row=3, column=1, padx=5, pady=5)
Label(self.frame, text="Délai (ms) :").grid(row=4, column=0, sticky="w", padx=5, pady=5)
Entry(self.frame, textvariable=self.delay_ms, justify="center", width=7).grid(row=4, column=1, padx=5, pady=5)
self.output_label = Label(self.frame, text="", justify="left", anchor="w", wraplength=400)
self.output_label.grid(row=5, column=0, columnspan=2, sticky="w", padx=5, pady=5)
self.test_button = Button(self.frame, text="Test IP", command=self.test_ip)
self.test_button.grid(row=6, column=0, padx=5, pady=5)
self.start_button = Button(self.frame, text="Démarrer Transmission", command=self.start_transmission, state="disabled")
self.start_button.grid(row=6, column=1, padx=5, pady=5)
self.stop_button = Button(self.frame, text="Arrêter Transmission", command=self.stop_transmission, state="disabled")
self.stop_button.grid(row=7, column=0, columnspan=2, padx=5, pady=5)
self.loop = asyncio.get_event_loop()
async def test_ip_async(self, ip):
proc = await asyncio.create_subprocess_exec(
"ping", "-n", "1", ip,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
await proc.communicate()
return proc.returncode == 0
def test_ip(self):
ip = self.ip.get()
self.loop.create_task(self._test_ip_and_notify(ip))
async def _test_ip_and_notify(self, ip):
self.output_label.config(text=f"Test de connexion à {ip}...")
if await self.test_ip_async(ip):
self.output_label.config(text="Vous êtes connecté au serveur Modbus")
self.start_button.config(state="normal")
self.stop_button.config(state="normal")
else:
self.output_label.config(text=f"L'adresse {ip} n'est pas joignable.")
self.start_button.config(state="disabled")
self.stop_button.config(state="disabled")
async def update_registers_with_datetime(self, ip, port, slave_id, start_register, delay_ms):
client = AsyncModbusTcpClient(ip, port=int(port))
try:
if not await client.connect():
raise ConnectionError(f"Impossible de se connecter à {ip}:{port}")
while self.running:
now = datetime.datetime.now()
values = [
now.weekday() + 1, # R1 : Numéro du jour (1 = Lundi, 7 = Dimanche)
int(f"{now.second:02}00", 16), # R2 : Secondes en BCD sous forme 16#SS00
int(f"{now.hour:02}{now.minute:02}", 16), # R3 : Heure et minute en BCD sous forme 16#HHMM
int(f"{now.month:02}{now.day:02}", 16), # R4 : Mois et jour en BCD sous forme 16#MMDD
int(f"{now.year:04}", 16), # R5 : Année en BCD sous forme 16#YYYY
]
output = "Transmission en cours...\n"
for i, value in enumerate(values):
try:
write_result = await client.write_register(
start_register + i, value, slave=int(slave_id)
)
if write_result.isError():
output += f"Erreur au registre {start_register + i}.\n"
else:
output += f"Registre {start_register + i}: {value}\n"
self.request_count += 1 # Incrémente le nombre de requêtes envoyées
except Exception as e:
output += f"Erreur lors de l'écriture au registre {start_register + i}: {e}\n"
# Affichage du nombre de requêtes envoyées
output += f"\nNombre de requêtes envoyées : {self.request_count//5}\n"
self.output_label.config(text=output)
await asyncio.sleep(delay_ms / 1000)
except asyncio.CancelledError:
self.output_label.config(text="Transmission arrêtée.")
except ConnectionError as e:
self.output_label.config(text=str(e))
except Exception as e:
messagebox.showerror("Erreur", f"Erreur lors de la transmission : {e}")
finally:
if client:
await client.close()
def start_transmission(self):
try:
ip = self.ip.get()
port = self.port.get()
slave_id = self.slave_id.get()
start_register = int(self.start_register.get())
delay_ms = int(self.delay_ms.get())
# Vérification des entrées
if not self.validate_ip(ip):
messagebox.showerror("Erreur", "L'adresse IP est invalide.")
return
if not self.validate_port(port):
messagebox.showerror("Erreur", "Le port doit être un nombre entier valide.")
return
if not self.validate_slave_id(slave_id):
messagebox.showerror("Erreur", "L'ID de l'esclave doit être un nombre entier valide.")
return
if not self.validate_register(start_register):
messagebox.showerror("Erreur", "Le registre de départ doit être un nombre entier positif.")
return
if not self.validate_delay_ms(delay_ms):
messagebox.showerror("Erreur", "Le délai doit être un nombre entier entre 10ms et 10000ms.")
return
self.running = True
self.loop.create_task(
self.update_registers_with_datetime(ip, port, slave_id, start_register, delay_ms)
)
self.start_button.config(state="disabled")
self.stop_button.config(state="normal")
except ValueError:
messagebox.showerror("Erreur", "Veuillez entrer des valeurs valides pour les champs.")
def stop_transmission(self):
self.running = False
self.quit_application()
def quit_application(self):
self.running = False
self.root.quit()
self.root.destroy()
def validate_ip(self, ip):
# Vérifier que l'IP est au format correct (ex: 192.168.0.1)
regex = r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$"
return re.match(regex, ip) is not None
def validate_port(self, port):
try:
port_int = int(port)
return 0 <= port_int <= 65535
except ValueError:
return False
def validate_slave_id(self, slave_id):
try:
slave_id_int = int(slave_id)
return 0 <= slave_id_int <= 255 # ID d'esclave Modbus de 0 à 255
except ValueError:
return False
def validate_register(self, start_register):
try:
return int(start_register) >= 0
except ValueError:
return False
def validate_delay_ms(self, delay_ms):
try:
delay_int = int(delay_ms)
return 10 <= delay_int <= 10000 # délai entre 10ms et 10s
except ValueError:
return False
if __name__ == "__main__":
root = Tk()
app = ModbusGUI(root)
# Intégrer asyncio et tkinter
def run_asyncio_loop():
try:
app.loop.run_until_complete(asyncio.sleep(0.1))
except Exception:
pass
root.after(100, run_asyncio_loop)
# Initialiser asyncio avec Tkinter
root.after(100, run_asyncio_loop)
root.mainloop()
SERVEUR :
Code : Tout sélectionner
import asyncio
import tkinter as tk
import sys
import re # Pour vérifier l'adresse IP
from tkinter import simpledialog
from pymodbus.server.async_io import ModbusTcpServer
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusSlaveContext, ModbusServerContext
import logging
from datetime import datetime
import threading
# Configuration du journal
logging.basicConfig(level=logging.INFO)
log = logging.getLogger()
# Variables globales
server_task = None
loop = None
stop_event = asyncio.Event()
server_running = False
register_values_label = None # Label pour afficher les valeurs des registres
# Fonction pour valider l'adresse IP
def is_valid_ip(ip):
pattern = r"^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$"
return re.match(pattern, ip) is not None
# Fonction pour valider le numéro de port
def is_valid_port(port):
return port.isdigit() and 1 <= int(port) <= 65535
# Fonction pour valider l'ID de l'esclave
def is_valid_slave_id(slave_id):
return slave_id.isdigit() and 1 <= int(slave_id) <= 247
# Fonction pour mettre à jour les registres toutes les secondes
async def update_registers(context, slave_id):
global register_values_label
while not stop_event.is_set():
current_time = datetime.now()
# Remplacer week_number par current_time.weekday() + 1 pour obtenir le jour de la semaine (1 = lundi, 7 = dimanche)
day_of_week = current_time.weekday() + 1 # Lundi = 1, Mardi = 2, ...
# Conversion en BCD pour les registres
second_bcd = int(f"{current_time.second:02}00", 16)
hour_minute_bcd = int(f"{current_time.hour:02}{current_time.minute:02}", 16)
month_day_bcd = int(f"{current_time.month:02}{current_time.day:02}", 16)
year_bcd = int(f"{current_time.year}", 16)
register_values = [
day_of_week, # R1 : Numéro du jour de la semaine
second_bcd, # R2 : Seconde en BCD (16#SS00)
hour_minute_bcd, # R3 : Heure et minute en BCD (16#HHMM)
month_day_bcd, # R4 : Mois et jours en BCD (16#MMDD)
year_bcd # R5 : Année en BCD (16#YYYY)
]
register_values += [0] * (20 - len(register_values))
slave = context[slave_id]
slave.setValues(3, 1, register_values)
log.info(f"Valeurs des registres mises à jour : {register_values}")
# Mettre à jour le label dans tkinter pour afficher les 4 premiers registres en BCD
if register_values_label:
bcd_values = [
f"R1: {register_values[0]}", # Jour de la semaine
f"R2: {register_values[1]:04X}", # Seconde en BCD
f"R3: {register_values[2]:04X}", # Heure et minute en BCD
f"R4: {register_values[3]:04X}", # Mois et jour en BCD
f"R5: {register_values[4]:04X}" # Année
]
register_values_label.config(text="\n".join(bcd_values))
await asyncio.sleep(1)
def configure_server(ip_address, port, slave_id):
register_values = [0] * 20
store = ModbusSlaveContext(hr=ModbusSequentialDataBlock(1, register_values))
context = ModbusServerContext(slaves={slave_id: store}, single=False)
address = (ip_address, port)
return context, address
async def start_server(context, address):
server = ModbusTcpServer(context, address=address)
log.info(f"Serveur Modbus démarré sur {address[0]}:{address[1]}")
await server.serve_forever()
def show_popup():
global server_task, loop, stop_event, server_running, register_values_label
root = tk.Tk()
root.title("Configuration du Serveur Modbus")
window_width, window_height = 400, 500
screen_width = root.winfo_screenwidth()
screen_height = root.winfo_screenheight()
position_top = int(screen_height / 2 - window_height / 2)
position_right = int(screen_width / 2 - window_width / 2)
root.geometry(f'{window_width}x{window_height}+{position_right}+{position_top}')
label = tk.Label(root, text="Veuillez entrer les informations du serveur Modbus", font=("Arial", 12))
label.pack(pady=10)
# Créer des champs de saisie pour l'adresse IP, le port et l'ID de l'esclave
ip_label = tk.Label(root, text="Adresse IP du serveur :")
ip_label.pack(pady=5)
ip_entry = tk.Entry(root, justify="center", width=15)
ip_entry.insert(0, "127.0.0.1")
ip_entry.pack(pady=5)
port_label = tk.Label(root, text="Numéro de port :")
port_label.pack(pady=5)
port_entry = tk.Entry(root, justify="center", width=7)
port_entry.insert(0, "502")
port_entry.pack(pady=5)
slave_label = tk.Label(root, text="ID de l'esclave :")
slave_label.pack(pady=5)
slave_entry = tk.Entry(root, justify="center", width=4)
slave_entry.insert(0, "1")
slave_entry.pack(pady=5)
status_label = tk.Label(root, text="Serveur arrêté", font=("Arial", 14), fg="black")
status_label.pack(pady=10)
time_label = tk.Label(root, text="", font=("Arial", 10))
time_label.pack(pady=5)
# Label pour afficher les valeurs des registres
register_values_label = tk.Label(root, text="", font=("Arial", 10))
register_values_label.pack(pady=10)
def update_time():
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
time_label.config(text=current_time)
root.after(1000, update_time)
update_time()
def toggle_server_button():
global server_task, loop, stop_event, server_running
ip_address = ip_entry.get()
port = port_entry.get()
slave_id = slave_entry.get()
# Validation des entrées
if not is_valid_ip(ip_address):
status_label.config(text="Adresse IP invalide", fg="red")
return
if not is_valid_port(port):
status_label.config(text="Numéro de port invalide", fg="red")
return
if not is_valid_slave_id(slave_id):
status_label.config(text="ID d'esclave invalide", fg="red")
return
ip_address = ip_address or "192.168.1.212"
port = int(port) or 502
slave_id = int(slave_id) or 1
if server_running:
stop_event.set()
server_task.cancel()
loop.stop()
status_label.config(text="Serveur arrêté", fg="black")
start_button.config(text="Démarrer le Serveur")
log.info("Serveur Modbus arrêté.")
root.quit()
sys.exit()
else:
def start_asyncio():
global server_task, loop, stop_event
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
context, address = configure_server(ip_address, port, slave_id)
server_task = loop.create_task(start_server(context, address))
loop.create_task(update_registers(context, slave_id))
start_button.config(text="Arrêter le Serveur")
status_label.config(text="Serveur en ligne", fg="green")
loop.run_forever()
thread = threading.Thread(target=start_asyncio)
thread.start()
server_running = not server_running
start_button = tk.Button(root, text="Démarrer le Serveur", command=toggle_server_button)
start_button.pack(pady=10)
def on_closing():
if server_running:
stop_event.set()
server_task.cancel()
loop.stop()
log.info("Serveur Modbus arrêté.")
root.quit()
sys.exit()
root.protocol("WM_DELETE_WINDOW", on_closing)
# Ajouter un message en bas de la fenêtre
footer_label = tk.Label(root, text="Logiciel gratuit et sans droit", font=("Arial", 8), fg="gray")
footer_label.pack(side="bottom", pady=10)
root.mainloop()
if __name__ == "__main__":
show_popup()
Le we-transfert avec les archives et les certificats inclus :
https://we.tl/t-HGjOPcDLHx
M'envoyer un MP si le lien est mort
