Si, si, les fonctions WRTC et RRTC existent bien dans PL7 Pro.
Mais si la bibliothèque "Dates, Heures et Durées" n'est pas utilisée dans l'application PL7, on ne peut pas l'ajouter en RUN dans l'automate.

Si, si, les fonctions WRTC et RRTC existent bien dans PL7 Pro.


Oula, plutot 'roots' le site. En plus ca parle un français bizarre, je comprends pas toutBruce33 a écrit : ↑06 déc. 2024, 19:21 Ah ben j'vois qu'on s'amuse bien ici !![]()
Merci pour ces cadeaux avant Noël.
Si vous avez besoin de vous défouler avec des épreuves de programmation, participez donc à l'Advent of Code 2024 (un calendrier de l'avent qui propose une épreuve de programmation chaque jour).
Bonnes idées ! En plus c'est pas trop compliqué a faire



Code : Tout sélectionner
import asyncio
import datetime
from tkinter import Tk, Label, Entry, Button, messagebox, StringVar, Frame
from pymodbus.client import AsyncModbusTcpClient
import re
class ModbusGUI:
def __init__(self, root):
self.root = root
self.root.title("Transmission Modbus")
self.root.protocol("WM_DELETE_WINDOW", self.quit_application)
# Variables
self.ip = StringVar(value="127.0.0.1")
self.port = StringVar(value="502")
self.slave_id = StringVar(value="1")
self.start_register = StringVar(value="0")
self.delay_ms = StringVar(value="1000")
self.running = False
self.request_count = 0 # Compteur pour les requêtes envoyées
# Frame principal pour centrer les champs de saisie
self.frame = Frame(root)
self.frame.pack(padx=10, pady=10, expand=True, fill="both")
# Widgets
Label(self.frame, text="Adresse IP :").grid(row=0, column=0, sticky="w", padx=5, pady=5)
Entry(self.frame, textvariable=self.ip, justify="center", width=15).grid(row=0, column=1, padx=5, pady=5)
Label(self.frame, text="Port :").grid(row=1, column=0, sticky="w", padx=5, pady=5)
Entry(self.frame, textvariable=self.port, justify="center", width=7).grid(row=1, column=1, padx=5, pady=5)
Label(self.frame, text="ID Esclave :").grid(row=2, column=0, sticky="w", padx=5, pady=5)
Entry(self.frame, textvariable=self.slave_id, justify="center", width=7).grid(row=2, column=1, padx=5, pady=5)
Label(self.frame, text="Registre de départ :").grid(row=3, column=0, sticky="w", padx=5, pady=5)
Entry(self.frame, textvariable=self.start_register, justify="center", width=8).grid(row=3, column=1, padx=5, pady=5)
Label(self.frame, text="Délai (ms) :").grid(row=4, column=0, sticky="w", padx=5, pady=5)
Entry(self.frame, textvariable=self.delay_ms, justify="center", width=7).grid(row=4, column=1, padx=5, pady=5)
self.output_label = Label(self.frame, text="", justify="left", anchor="w", wraplength=400)
self.output_label.grid(row=5, column=0, columnspan=2, sticky="w", padx=5, pady=5)
self.test_button = Button(self.frame, text="Test IP", command=self.test_ip)
self.test_button.grid(row=6, column=0, padx=5, pady=5)
self.start_button = Button(self.frame, text="Démarrer Transmission", command=self.start_transmission, state="disabled")
self.start_button.grid(row=6, column=1, padx=5, pady=5)
self.stop_button = Button(self.frame, text="Arrêter Transmission", command=self.stop_transmission, state="disabled")
self.stop_button.grid(row=7, column=0, columnspan=2, padx=5, pady=5)
self.loop = asyncio.get_event_loop()
async def test_ip_async(self, ip):
proc = await asyncio.create_subprocess_exec(
"ping", "-n", "1", ip,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
await proc.communicate()
return proc.returncode == 0
def test_ip(self):
ip = self.ip.get()
self.loop.create_task(self._test_ip_and_notify(ip))
async def _test_ip_and_notify(self, ip):
self.output_label.config(text=f"Test de connexion à {ip}...")
if await self.test_ip_async(ip):
self.output_label.config(text="Vous êtes connecté au serveur Modbus")
self.start_button.config(state="normal")
self.stop_button.config(state="normal")
else:
self.output_label.config(text=f"L'adresse {ip} n'est pas joignable.")
self.start_button.config(state="disabled")
self.stop_button.config(state="disabled")
async def update_registers_with_datetime(self, ip, port, slave_id, start_register, delay_ms):
client = AsyncModbusTcpClient(ip, port=int(port))
try:
if not await client.connect():
raise ConnectionError(f"Impossible de se connecter à {ip}:{port}")
while self.running:
now = datetime.datetime.now()
values = [
now.weekday() + 1, # R1 : Numéro du jour (1 = Lundi, 7 = Dimanche)
int(f"{now.second:02}00", 16), # R2 : Secondes en BCD sous forme 16#SS00
int(f"{now.hour:02}{now.minute:02}", 16), # R3 : Heure et minute en BCD sous forme 16#HHMM
int(f"{now.month:02}{now.day:02}", 16), # R4 : Mois et jour en BCD sous forme 16#MMDD
int(f"{now.year:04}", 16), # R5 : Année en BCD sous forme 16#YYYY
]
output = "Transmission en cours...\n"
for i, value in enumerate(values):
try:
write_result = await client.write_register(
start_register + i, value, slave=int(slave_id)
)
if write_result.isError():
output += f"Erreur au registre {start_register + i}.\n"
else:
output += f"Registre {start_register + i}: {value}\n"
self.request_count += 1 # Incrémente le nombre de requêtes envoyées
except Exception as e:
output += f"Erreur lors de l'écriture au registre {start_register + i}: {e}\n"
# Affichage du nombre de requêtes envoyées
output += f"\nNombre de requêtes envoyées : {self.request_count//5}\n"
self.output_label.config(text=output)
await asyncio.sleep(delay_ms / 1000)
except asyncio.CancelledError:
self.output_label.config(text="Transmission arrêtée.")
except ConnectionError as e:
self.output_label.config(text=str(e))
except Exception as e:
messagebox.showerror("Erreur", f"Erreur lors de la transmission : {e}")
finally:
if client:
await client.close()
def start_transmission(self):
try:
ip = self.ip.get()
port = self.port.get()
slave_id = self.slave_id.get()
start_register = int(self.start_register.get())
delay_ms = int(self.delay_ms.get())
# Vérification des entrées
if not self.validate_ip(ip):
messagebox.showerror("Erreur", "L'adresse IP est invalide.")
return
if not self.validate_port(port):
messagebox.showerror("Erreur", "Le port doit être un nombre entier valide.")
return
if not self.validate_slave_id(slave_id):
messagebox.showerror("Erreur", "L'ID de l'esclave doit être un nombre entier valide.")
return
if not self.validate_register(start_register):
messagebox.showerror("Erreur", "Le registre de départ doit être un nombre entier positif.")
return
if not self.validate_delay_ms(delay_ms):
messagebox.showerror("Erreur", "Le délai doit être un nombre entier entre 10ms et 10000ms.")
return
self.running = True
self.loop.create_task(
self.update_registers_with_datetime(ip, port, slave_id, start_register, delay_ms)
)
self.start_button.config(state="disabled")
self.stop_button.config(state="normal")
except ValueError:
messagebox.showerror("Erreur", "Veuillez entrer des valeurs valides pour les champs.")
def stop_transmission(self):
self.running = False
self.quit_application()
def quit_application(self):
self.running = False
self.root.quit()
self.root.destroy()
def validate_ip(self, ip):
# Vérifier que l'IP est au format correct (ex: 192.168.0.1)
regex = r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$"
return re.match(regex, ip) is not None
def validate_port(self, port):
try:
port_int = int(port)
return 0 <= port_int <= 65535
except ValueError:
return False
def validate_slave_id(self, slave_id):
try:
slave_id_int = int(slave_id)
return 0 <= slave_id_int <= 255 # ID d'esclave Modbus de 0 à 255
except ValueError:
return False
def validate_register(self, start_register):
try:
return int(start_register) >= 0
except ValueError:
return False
def validate_delay_ms(self, delay_ms):
try:
delay_int = int(delay_ms)
return 10 <= delay_int <= 10000 # délai entre 10ms et 10s
except ValueError:
return False
if __name__ == "__main__":
root = Tk()
app = ModbusGUI(root)
# Intégrer asyncio et tkinter
def run_asyncio_loop():
try:
app.loop.run_until_complete(asyncio.sleep(0.1))
except Exception:
pass
root.after(100, run_asyncio_loop)
# Initialiser asyncio avec Tkinter
root.after(100, run_asyncio_loop)
root.mainloop()
Code : Tout sélectionner
import asyncio
import tkinter as tk
import sys
import re # Pour vérifier l'adresse IP
from tkinter import simpledialog
from pymodbus.server.async_io import ModbusTcpServer
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusSlaveContext, ModbusServerContext
import logging
from datetime import datetime
import threading
# Configuration du journal
logging.basicConfig(level=logging.INFO)
log = logging.getLogger()
# Variables globales
server_task = None
loop = None
stop_event = asyncio.Event()
server_running = False
register_values_label = None # Label pour afficher les valeurs des registres
# Fonction pour valider l'adresse IP
def is_valid_ip(ip):
pattern = r"^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$"
return re.match(pattern, ip) is not None
# Fonction pour valider le numéro de port
def is_valid_port(port):
return port.isdigit() and 1 <= int(port) <= 65535
# Fonction pour valider l'ID de l'esclave
def is_valid_slave_id(slave_id):
return slave_id.isdigit() and 1 <= int(slave_id) <= 247
# Fonction pour mettre à jour les registres toutes les secondes
async def update_registers(context, slave_id):
global register_values_label
while not stop_event.is_set():
current_time = datetime.now()
# Remplacer week_number par current_time.weekday() + 1 pour obtenir le jour de la semaine (1 = lundi, 7 = dimanche)
day_of_week = current_time.weekday() + 1 # Lundi = 1, Mardi = 2, ...
# Conversion en BCD pour les registres
second_bcd = int(f"{current_time.second:02}00", 16)
hour_minute_bcd = int(f"{current_time.hour:02}{current_time.minute:02}", 16)
month_day_bcd = int(f"{current_time.month:02}{current_time.day:02}", 16)
year_bcd = int(f"{current_time.year}", 16)
register_values = [
day_of_week, # R1 : Numéro du jour de la semaine
second_bcd, # R2 : Seconde en BCD (16#SS00)
hour_minute_bcd, # R3 : Heure et minute en BCD (16#HHMM)
month_day_bcd, # R4 : Mois et jours en BCD (16#MMDD)
year_bcd # R5 : Année en BCD (16#YYYY)
]
register_values += [0] * (20 - len(register_values))
slave = context[slave_id]
slave.setValues(3, 1, register_values)
log.info(f"Valeurs des registres mises à jour : {register_values}")
# Mettre à jour le label dans tkinter pour afficher les 4 premiers registres en BCD
if register_values_label:
bcd_values = [
f"R1: {register_values[0]}", # Jour de la semaine
f"R2: {register_values[1]:04X}", # Seconde en BCD
f"R3: {register_values[2]:04X}", # Heure et minute en BCD
f"R4: {register_values[3]:04X}", # Mois et jour en BCD
f"R5: {register_values[4]:04X}" # Année
]
register_values_label.config(text="\n".join(bcd_values))
await asyncio.sleep(1)
def configure_server(ip_address, port, slave_id):
register_values = [0] * 20
store = ModbusSlaveContext(hr=ModbusSequentialDataBlock(1, register_values))
context = ModbusServerContext(slaves={slave_id: store}, single=False)
address = (ip_address, port)
return context, address
async def start_server(context, address):
server = ModbusTcpServer(context, address=address)
log.info(f"Serveur Modbus démarré sur {address[0]}:{address[1]}")
await server.serve_forever()
def show_popup():
global server_task, loop, stop_event, server_running, register_values_label
root = tk.Tk()
root.title("Configuration du Serveur Modbus")
window_width, window_height = 400, 500
screen_width = root.winfo_screenwidth()
screen_height = root.winfo_screenheight()
position_top = int(screen_height / 2 - window_height / 2)
position_right = int(screen_width / 2 - window_width / 2)
root.geometry(f'{window_width}x{window_height}+{position_right}+{position_top}')
label = tk.Label(root, text="Veuillez entrer les informations du serveur Modbus", font=("Arial", 12))
label.pack(pady=10)
# Créer des champs de saisie pour l'adresse IP, le port et l'ID de l'esclave
ip_label = tk.Label(root, text="Adresse IP du serveur :")
ip_label.pack(pady=5)
ip_entry = tk.Entry(root, justify="center", width=15)
ip_entry.insert(0, "127.0.0.1")
ip_entry.pack(pady=5)
port_label = tk.Label(root, text="Numéro de port :")
port_label.pack(pady=5)
port_entry = tk.Entry(root, justify="center", width=7)
port_entry.insert(0, "502")
port_entry.pack(pady=5)
slave_label = tk.Label(root, text="ID de l'esclave :")
slave_label.pack(pady=5)
slave_entry = tk.Entry(root, justify="center", width=4)
slave_entry.insert(0, "1")
slave_entry.pack(pady=5)
status_label = tk.Label(root, text="Serveur arrêté", font=("Arial", 14), fg="black")
status_label.pack(pady=10)
time_label = tk.Label(root, text="", font=("Arial", 10))
time_label.pack(pady=5)
# Label pour afficher les valeurs des registres
register_values_label = tk.Label(root, text="", font=("Arial", 10))
register_values_label.pack(pady=10)
def update_time():
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
time_label.config(text=current_time)
root.after(1000, update_time)
update_time()
def toggle_server_button():
global server_task, loop, stop_event, server_running
ip_address = ip_entry.get()
port = port_entry.get()
slave_id = slave_entry.get()
# Validation des entrées
if not is_valid_ip(ip_address):
status_label.config(text="Adresse IP invalide", fg="red")
return
if not is_valid_port(port):
status_label.config(text="Numéro de port invalide", fg="red")
return
if not is_valid_slave_id(slave_id):
status_label.config(text="ID d'esclave invalide", fg="red")
return
ip_address = ip_address or "192.168.1.212"
port = int(port) or 502
slave_id = int(slave_id) or 1
if server_running:
stop_event.set()
server_task.cancel()
loop.stop()
status_label.config(text="Serveur arrêté", fg="black")
start_button.config(text="Démarrer le Serveur")
log.info("Serveur Modbus arrêté.")
root.quit()
sys.exit()
else:
def start_asyncio():
global server_task, loop, stop_event
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
context, address = configure_server(ip_address, port, slave_id)
server_task = loop.create_task(start_server(context, address))
loop.create_task(update_registers(context, slave_id))
start_button.config(text="Arrêter le Serveur")
status_label.config(text="Serveur en ligne", fg="green")
loop.run_forever()
thread = threading.Thread(target=start_asyncio)
thread.start()
server_running = not server_running
start_button = tk.Button(root, text="Démarrer le Serveur", command=toggle_server_button)
start_button.pack(pady=10)
def on_closing():
if server_running:
stop_event.set()
server_task.cancel()
loop.stop()
log.info("Serveur Modbus arrêté.")
root.quit()
sys.exit()
root.protocol("WM_DELETE_WINDOW", on_closing)
# Ajouter un message en bas de la fenêtre
footer_label = tk.Label(root, text="Logiciel gratuit et sans droit", font=("Arial", 8), fg="gray")
footer_label.pack(side="bottom", pady=10)
root.mainloop()
if __name__ == "__main__":
show_popup()


Merci, ça fait toujours plaisir

En regardant ton image, j'ai vu la version de ton modbus doctor ! Il y a la version 2.10 maintenant tu sais !MiGaNuTs a écrit : ↑09 déc. 2024, 08:47 2024-12-09 08_27_09-Window.png
En BCD c'est fait(mais pas encore publié)
En UTC c'est faisable facilement, surtout que je donne le code source
Ce qui m'anime c'est d'ajouter un compteur de requêtes reçues, mais pour le moment je n’y arrive pas.
J'ai pas abandonné l'idée. Ça va juste me prendre plus de temps qu'espéré.

Elle marche pas sous windows XPandala a écrit : ↑09 déc. 2024, 16:43En regardant ton image, j'ai vu la version de ton modbus doctor ! Il y a la version 2.10 maintenant tu sais !MiGaNuTs a écrit : ↑09 déc. 2024, 08:47 2024-12-09 08_27_09-Window.png
En BCD c'est fait(mais pas encore publié)
En UTC c'est faisable facilement, surtout que je donne le code source
Ce qui m'anime c'est d'ajouter un compteur de requêtes reçues, mais pour le moment je n’y arrive pas.
J'ai pas abandonné l'idée. Ça va juste me prendre plus de temps qu'espéré.![]()
![]()
![]()