Page 4 sur 5

Re: Recuperer l'heure d'un PC

Posté : 06 déc. 2024, 19:28
par Bruce33
JC87 a écrit : 05 déc. 2024, 10:00 Il faudrait adapter pour un 57 en PL7 Pro mais le principe est le même, c'est juste la fonction WRTC_DT qui n'existe pas donc le code sera un peu différent et il faudrait jouer avec le bit système %S48
Si, si, les fonctions WRTC et RRTC existent bien dans PL7 Pro.
Mais si la bibliothèque "Dates, Heures et Durées" n'est pas utilisée dans l'application PL7, on ne peut pas l'ajouter en RUN dans l'automate.

Re: Recuperer l'heure d'un PC

Posté : 06 déc. 2024, 21:16
par andala
Pas faux, je m'y attelle des que je peux

Re: Recuperer l'heure d'un PC

Posté : 07 déc. 2024, 13:05
par MiGaNuTs
Bruce33 a écrit : 06 déc. 2024, 19:21 Ah ben j'vois qu'on s'amuse bien ici ! :)

Merci pour ces cadeaux avant Noël.
Si vous avez besoin de vous défouler avec des épreuves de programmation, participez donc à l'Advent of Code 2024 (un calendrier de l'avent qui propose une épreuve de programmation chaque jour).
Oula, plutot 'roots' le site. En plus ca parle un français bizarre, je comprends pas tout :)
Je vais passer mon tour :)
Bruce33 a écrit : 06 déc. 2024, 19:21 Sinon pour améliorer votre serveur-horloge pour automate (je ne l'ai pas encore testé) on peut ajouter différents formats afin de réduire au maximum la programmation dans l'automate : format BCD, heure locale et heure UTC...
Bonnes idées ! En plus c'est pas trop compliqué a faire :). C'est vrai que ça simplifie (un tout petit peu) le code coté automate. Pratique pour les faignasses :)
Pourquoi pas une info si le jour est férié ou pas également, mais la ça se complique un peu :)

Ce que j'aimerai bien faire c'est un compteur de requêtes reçues, mais je comprends rien a comment je pourrais faire ça.

Et aussi utiliser la petite fenêtre d'affichage pour y mettre des informations. Ça c'est gérable facilement aussi.

Re: Recuperer l'heure d'un PC

Posté : 09 déc. 2024, 08:47
par MiGaNuTs
2024-12-09 08_27_09-Window.png
En BCD c'est fait :) (mais pas encore publié)

En UTC c'est faisable facilement, surtout que je donne le code source :)

Ce qui m'anime c'est d'ajouter un compteur de requêtes reçues, mais pour le moment je n’y arrive pas.
J'ai pas abandonné l'idée. Ça va juste me prendre plus de temps qu'espéré.

Re: Recuperer l'heure d'un PC

Posté : 09 déc. 2024, 09:03
par andala
Je suis content que ça ne serve pas seulement qu'a moi et que tout le monde participe !
ça me fait chaud au coeur

Re: Recuperer l'heure d'un PC

Posté : 09 déc. 2024, 10:40
par andala
Code corrigé pour transmettre en BCD dans l'ordre exacte pour Schneider avec un compteur de requête en mode client :

CLIENT :

Code : Tout sélectionner

import asyncio
import datetime
from tkinter import Tk, Label, Entry, Button, messagebox, StringVar, Frame
from pymodbus.client import AsyncModbusTcpClient
import re


class ModbusGUI:
    def __init__(self, root):
        self.root = root
        self.root.title("Transmission Modbus")
        self.root.protocol("WM_DELETE_WINDOW", self.quit_application)

        # Variables
        self.ip = StringVar(value="127.0.0.1")
        self.port = StringVar(value="502")
        self.slave_id = StringVar(value="1")
        self.start_register = StringVar(value="0")
        self.delay_ms = StringVar(value="1000")
        self.running = False
        self.request_count = 0  # Compteur pour les requêtes envoyées

        # Frame principal pour centrer les champs de saisie
        self.frame = Frame(root)
        self.frame.pack(padx=10, pady=10, expand=True, fill="both")

        # Widgets
        Label(self.frame, text="Adresse IP :").grid(row=0, column=0, sticky="w", padx=5, pady=5)
        Entry(self.frame, textvariable=self.ip, justify="center", width=15).grid(row=0, column=1, padx=5, pady=5)

        Label(self.frame, text="Port :").grid(row=1, column=0, sticky="w", padx=5, pady=5)
        Entry(self.frame, textvariable=self.port, justify="center", width=7).grid(row=1, column=1, padx=5, pady=5)

        Label(self.frame, text="ID Esclave :").grid(row=2, column=0, sticky="w", padx=5, pady=5)
        Entry(self.frame, textvariable=self.slave_id, justify="center", width=7).grid(row=2, column=1, padx=5, pady=5)

        Label(self.frame, text="Registre de départ :").grid(row=3, column=0, sticky="w", padx=5, pady=5)
        Entry(self.frame, textvariable=self.start_register, justify="center", width=8).grid(row=3, column=1, padx=5, pady=5)

        Label(self.frame, text="Délai (ms) :").grid(row=4, column=0, sticky="w", padx=5, pady=5)
        Entry(self.frame, textvariable=self.delay_ms, justify="center", width=7).grid(row=4, column=1, padx=5, pady=5)

        self.output_label = Label(self.frame, text="", justify="left", anchor="w", wraplength=400)
        self.output_label.grid(row=5, column=0, columnspan=2, sticky="w", padx=5, pady=5)

        self.test_button = Button(self.frame, text="Test IP", command=self.test_ip)
        self.test_button.grid(row=6, column=0, padx=5, pady=5)

        self.start_button = Button(self.frame, text="Démarrer Transmission", command=self.start_transmission, state="disabled")
        self.start_button.grid(row=6, column=1, padx=5, pady=5)

        self.stop_button = Button(self.frame, text="Arrêter Transmission", command=self.stop_transmission, state="disabled")
        self.stop_button.grid(row=7, column=0, columnspan=2, padx=5, pady=5)

        self.loop = asyncio.get_event_loop()

    async def test_ip_async(self, ip):
        proc = await asyncio.create_subprocess_exec(
            "ping", "-n", "1", ip,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        await proc.communicate()
        return proc.returncode == 0

    def test_ip(self):
        ip = self.ip.get()
        self.loop.create_task(self._test_ip_and_notify(ip))

    async def _test_ip_and_notify(self, ip):
        self.output_label.config(text=f"Test de connexion à {ip}...")
        if await self.test_ip_async(ip):
            self.output_label.config(text="Vous êtes connecté au serveur Modbus")
            self.start_button.config(state="normal")
            self.stop_button.config(state="normal")
        else:
            self.output_label.config(text=f"L'adresse {ip} n'est pas joignable.")
            self.start_button.config(state="disabled")
            self.stop_button.config(state="disabled")

    async def update_registers_with_datetime(self, ip, port, slave_id, start_register, delay_ms):
        client = AsyncModbusTcpClient(ip, port=int(port))
        try:
            if not await client.connect():
                raise ConnectionError(f"Impossible de se connecter à {ip}:{port}")
            
            while self.running:
                now = datetime.datetime.now()
                values = [
                    now.weekday() + 1,                          # R1 : Numéro du jour (1 = Lundi, 7 = Dimanche)
                    int(f"{now.second:02}00", 16),              # R2 : Secondes en BCD sous forme 16#SS00
                    int(f"{now.hour:02}{now.minute:02}", 16),   # R3 : Heure et minute en BCD sous forme 16#HHMM
                    int(f"{now.month:02}{now.day:02}", 16),     # R4 : Mois et jour en BCD sous forme 16#MMDD
                    int(f"{now.year:04}", 16),                  # R5 : Année en BCD sous forme 16#YYYY
                ]

                output = "Transmission en cours...\n"
                for i, value in enumerate(values):
                    try:
                        write_result = await client.write_register(
                            start_register + i, value, slave=int(slave_id)
                        )
                        if write_result.isError():
                            output += f"Erreur au registre {start_register + i}.\n"
                        else:
                            output += f"Registre {start_register + i}: {value}\n"
                            self.request_count += 1  # Incrémente le nombre de requêtes envoyées
                    except Exception as e:
                        output += f"Erreur lors de l'écriture au registre {start_register + i}: {e}\n"

                # Affichage du nombre de requêtes envoyées
                output += f"\nNombre de requêtes envoyées : {self.request_count//5}\n"
                self.output_label.config(text=output)
                await asyncio.sleep(delay_ms / 1000)

        except asyncio.CancelledError:
            self.output_label.config(text="Transmission arrêtée.")
        except ConnectionError as e:
            self.output_label.config(text=str(e))
        except Exception as e:
            messagebox.showerror("Erreur", f"Erreur lors de la transmission : {e}")
        finally:
            if client:
                await client.close()

    def start_transmission(self):
        try:
            ip = self.ip.get()
            port = self.port.get()
            slave_id = self.slave_id.get()
            start_register = int(self.start_register.get())
            delay_ms = int(self.delay_ms.get())

            # Vérification des entrées
            if not self.validate_ip(ip):
                messagebox.showerror("Erreur", "L'adresse IP est invalide.")
                return

            if not self.validate_port(port):
                messagebox.showerror("Erreur", "Le port doit être un nombre entier valide.")
                return

            if not self.validate_slave_id(slave_id):
                messagebox.showerror("Erreur", "L'ID de l'esclave doit être un nombre entier valide.")
                return

            if not self.validate_register(start_register):
                messagebox.showerror("Erreur", "Le registre de départ doit être un nombre entier positif.")
                return

            if not self.validate_delay_ms(delay_ms):
                messagebox.showerror("Erreur", "Le délai doit être un nombre entier entre 10ms et 10000ms.")
                return

            self.running = True
            self.loop.create_task(
                self.update_registers_with_datetime(ip, port, slave_id, start_register, delay_ms)
            )
            self.start_button.config(state="disabled")
            self.stop_button.config(state="normal")
        except ValueError:
            messagebox.showerror("Erreur", "Veuillez entrer des valeurs valides pour les champs.")

    def stop_transmission(self):
        self.running = False
        self.quit_application()

    def quit_application(self):
        self.running = False
        self.root.quit()
        self.root.destroy()

    def validate_ip(self, ip):
        # Vérifier que l'IP est au format correct (ex: 192.168.0.1)
        regex = r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$"
        return re.match(regex, ip) is not None

    def validate_port(self, port):
        try:
            port_int = int(port)
            return 0 <= port_int <= 65535
        except ValueError:
            return False

    def validate_slave_id(self, slave_id):
        try:
            slave_id_int = int(slave_id)
            return 0 <= slave_id_int <= 255  # ID d'esclave Modbus de 0 à 255
        except ValueError:
            return False

    def validate_register(self, start_register):
        try:
            return int(start_register) >= 0
        except ValueError:
            return False

    def validate_delay_ms(self, delay_ms):
        try:
            delay_int = int(delay_ms)
            return 10 <= delay_int <= 10000  # délai entre 10ms et 10s
        except ValueError:
            return False


if __name__ == "__main__":
    root = Tk()
    app = ModbusGUI(root)

    # Intégrer asyncio et tkinter
    def run_asyncio_loop():
        try:
            app.loop.run_until_complete(asyncio.sleep(0.1))
        except Exception:
            pass
        root.after(100, run_asyncio_loop)

    # Initialiser asyncio avec Tkinter
    root.after(100, run_asyncio_loop)
    root.mainloop()
SERVEUR :

Code : Tout sélectionner

import asyncio
import tkinter as tk
import sys
import re  # Pour vérifier l'adresse IP
from tkinter import simpledialog
from pymodbus.server.async_io import ModbusTcpServer
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusSlaveContext, ModbusServerContext
import logging
from datetime import datetime
import threading

# Configuration du journal
logging.basicConfig(level=logging.INFO)
log = logging.getLogger()

# Variables globales
server_task = None
loop = None
stop_event = asyncio.Event()
server_running = False
register_values_label = None  # Label pour afficher les valeurs des registres

# Fonction pour valider l'adresse IP
def is_valid_ip(ip):
    pattern = r"^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$"
    return re.match(pattern, ip) is not None

# Fonction pour valider le numéro de port
def is_valid_port(port):
    return port.isdigit() and 1 <= int(port) <= 65535

# Fonction pour valider l'ID de l'esclave
def is_valid_slave_id(slave_id):
    return slave_id.isdigit() and 1 <= int(slave_id) <= 247

# Fonction pour mettre à jour les registres toutes les secondes
async def update_registers(context, slave_id):
    global register_values_label
    while not stop_event.is_set():
        current_time = datetime.now()
        # Remplacer week_number par current_time.weekday() + 1 pour obtenir le jour de la semaine (1 = lundi, 7 = dimanche)
        day_of_week = current_time.weekday() + 1  # Lundi = 1, Mardi = 2, ...

        # Conversion en BCD pour les registres
        second_bcd = int(f"{current_time.second:02}00", 16)
        hour_minute_bcd = int(f"{current_time.hour:02}{current_time.minute:02}", 16)
        month_day_bcd = int(f"{current_time.month:02}{current_time.day:02}", 16)
        year_bcd = int(f"{current_time.year}", 16)

        register_values = [
            day_of_week,         # R1 : Numéro du jour de la semaine
            second_bcd,          # R2 : Seconde en BCD (16#SS00)
            hour_minute_bcd,     # R3 : Heure et minute en BCD (16#HHMM)
            month_day_bcd,       # R4 : Mois et jours en BCD (16#MMDD)
            year_bcd             # R5 : Année en BCD (16#YYYY)
        ]
        register_values += [0] * (20 - len(register_values))

        slave = context[slave_id]
        slave.setValues(3, 1, register_values)
        log.info(f"Valeurs des registres mises à jour : {register_values}")

        # Mettre à jour le label dans tkinter pour afficher les 4 premiers registres en BCD
        if register_values_label:
            bcd_values = [
                f"R1: {register_values[0]}",                  # Jour de la semaine
                f"R2: {register_values[1]:04X}",              # Seconde en BCD
                f"R3: {register_values[2]:04X}",              # Heure et minute en BCD
                f"R4: {register_values[3]:04X}",              # Mois et jour en BCD
                f"R5: {register_values[4]:04X}"                   # Année
            ]
            register_values_label.config(text="\n".join(bcd_values))

        await asyncio.sleep(1)

def configure_server(ip_address, port, slave_id):
    register_values = [0] * 20
    store = ModbusSlaveContext(hr=ModbusSequentialDataBlock(1, register_values))
    context = ModbusServerContext(slaves={slave_id: store}, single=False)
    address = (ip_address, port)
    return context, address

async def start_server(context, address):
    server = ModbusTcpServer(context, address=address)
    log.info(f"Serveur Modbus démarré sur {address[0]}:{address[1]}")
    await server.serve_forever()

def show_popup():
    global server_task, loop, stop_event, server_running, register_values_label

    root = tk.Tk()
    root.title("Configuration du Serveur Modbus")

    window_width, window_height = 400, 500
    screen_width = root.winfo_screenwidth()
    screen_height = root.winfo_screenheight()
    position_top = int(screen_height / 2 - window_height / 2)
    position_right = int(screen_width / 2 - window_width / 2)
    root.geometry(f'{window_width}x{window_height}+{position_right}+{position_top}')

    label = tk.Label(root, text="Veuillez entrer les informations du serveur Modbus", font=("Arial", 12))
    label.pack(pady=10)

    # Créer des champs de saisie pour l'adresse IP, le port et l'ID de l'esclave
    ip_label = tk.Label(root, text="Adresse IP du serveur :")
    ip_label.pack(pady=5)
    ip_entry = tk.Entry(root, justify="center", width=15)
    ip_entry.insert(0, "127.0.0.1")
    ip_entry.pack(pady=5)

    port_label = tk.Label(root, text="Numéro de port :")
    port_label.pack(pady=5)
    port_entry = tk.Entry(root, justify="center", width=7)
    port_entry.insert(0, "502")
    port_entry.pack(pady=5)

    slave_label = tk.Label(root, text="ID de l'esclave :")
    slave_label.pack(pady=5)
    slave_entry = tk.Entry(root, justify="center", width=4)
    slave_entry.insert(0, "1")
    slave_entry.pack(pady=5)

    status_label = tk.Label(root, text="Serveur arrêté", font=("Arial", 14), fg="black")
    status_label.pack(pady=10)

    time_label = tk.Label(root, text="", font=("Arial", 10))
    time_label.pack(pady=5)

    # Label pour afficher les valeurs des registres
    register_values_label = tk.Label(root, text="", font=("Arial", 10))
    register_values_label.pack(pady=10)

    def update_time():
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        time_label.config(text=current_time)
        root.after(1000, update_time)

    update_time()

    def toggle_server_button():
        global server_task, loop, stop_event, server_running

        ip_address = ip_entry.get()
        port = port_entry.get()
        slave_id = slave_entry.get()

        # Validation des entrées
        if not is_valid_ip(ip_address):
            status_label.config(text="Adresse IP invalide", fg="red")
            return
        if not is_valid_port(port):
            status_label.config(text="Numéro de port invalide", fg="red")
            return
        if not is_valid_slave_id(slave_id):
            status_label.config(text="ID d'esclave invalide", fg="red")
            return

        ip_address = ip_address or "192.168.1.212"
        port = int(port) or 502
        slave_id = int(slave_id) or 1

        if server_running:
            stop_event.set()
            server_task.cancel()
            loop.stop()
            status_label.config(text="Serveur arrêté", fg="black")
            start_button.config(text="Démarrer le Serveur")
            log.info("Serveur Modbus arrêté.")
            root.quit()
            sys.exit()
        else:
            def start_asyncio():
                global server_task, loop, stop_event
                loop = asyncio.new_event_loop()
                asyncio.set_event_loop(loop)

                context, address = configure_server(ip_address, port, slave_id)

                server_task = loop.create_task(start_server(context, address))
                loop.create_task(update_registers(context, slave_id))

                start_button.config(text="Arrêter le Serveur")
                status_label.config(text="Serveur en ligne", fg="green")
                loop.run_forever()

            thread = threading.Thread(target=start_asyncio)
            thread.start()

        server_running = not server_running

    start_button = tk.Button(root, text="Démarrer le Serveur", command=toggle_server_button)
    start_button.pack(pady=10)

    def on_closing():
        if server_running:
            stop_event.set()
            server_task.cancel()
            loop.stop()
            log.info("Serveur Modbus arrêté.")
        root.quit()
        sys.exit()

    root.protocol("WM_DELETE_WINDOW", on_closing)

    # Ajouter un message en bas de la fenêtre
    footer_label = tk.Label(root, text="Logiciel gratuit et sans droit", font=("Arial", 8), fg="gray")
    footer_label.pack(side="bottom", pady=10)

    root.mainloop()

if __name__ == "__main__":
    show_popup()
Le we-transfert avec les archives et les certificats inclus : https://we.tl/t-HGjOPcDLHx
M'envoyer un MP si le lien est mort
:D :D :D :D :D :D :D :D :D :D :D :D :D :D

Re: Recuperer l'heure d'un PC

Posté : 09 déc. 2024, 11:09
par Bernardo59
Bravo Andala !
Tu gères le Python, c'est propre ! :ugeek: :ugeek:

Re: Recuperer l'heure d'un PC

Posté : 09 déc. 2024, 11:32
par andala
Bernardo59 a écrit : 09 déc. 2024, 11:09 Bravo Andala !
Tu gères le Python, c'est propre ! :ugeek: :ugeek:
Merci, ça fait toujours plaisir 8-) :lol:

Re: Recuperer l'heure d'un PC

Posté : 09 déc. 2024, 16:43
par andala
MiGaNuTs a écrit : 09 déc. 2024, 08:47 2024-12-09 08_27_09-Window.png

En BCD c'est fait :) (mais pas encore publié)

En UTC c'est faisable facilement, surtout que je donne le code source :)

Ce qui m'anime c'est d'ajouter un compteur de requêtes reçues, mais pour le moment je n’y arrive pas.
J'ai pas abandonné l'idée. Ça va juste me prendre plus de temps qu'espéré.
En regardant ton image, j'ai vu la version de ton modbus doctor ! Il y a la version 2.10 maintenant tu sais ! :lol: :lol: :lol:

Re: Recuperer l'heure d'un PC

Posté : 09 déc. 2024, 16:56
par MiGaNuTs
andala a écrit : 09 déc. 2024, 16:43
MiGaNuTs a écrit : 09 déc. 2024, 08:47 2024-12-09 08_27_09-Window.png

En BCD c'est fait :) (mais pas encore publié)

En UTC c'est faisable facilement, surtout que je donne le code source :)

Ce qui m'anime c'est d'ajouter un compteur de requêtes reçues, mais pour le moment je n’y arrive pas.
J'ai pas abandonné l'idée. Ça va juste me prendre plus de temps qu'espéré.
En regardant ton image, j'ai vu la version de ton modbus doctor ! Il y a la version 2.10 maintenant tu sais ! :lol: :lol: :lol:
Elle marche pas sous windows XP :)