- Add scripts/import_csv.py for direct SQLite database import - Handle date conflicts with warnings (existing data preserved) - Support multiple time slots per entry (semicolon-separated) - Validate day_type, journey_profile_id, and motor_vehicle_id - Update README.md with usage instructions
164 lines
6.2 KiB
Python
Executable File
164 lines
6.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Script d'import CSV vers la base de données SQLite.
|
|
|
|
Format CSV attendu :
|
|
date,day_type,journey_profile_id,motor_vehicle_id,start_time,end_time,comment
|
|
|
|
Exemple :
|
|
2025-06-02,WORK,moteur_seul,familiale,09:00;14:00,17:45;12:00,Travail normal
|
|
2025-06-03,TT,,,09:00,17:45,Télétravail
|
|
|
|
Pour plusieurs plages horaires, séparer les heures par des points-virgules (;).
|
|
|
|
Types de jour valides : WORK, TT, GARDE, ASTREINTE, FORMATION, RTT, CONGE, MALADE, FERIE
|
|
|
|
En cas de conflit sur une date, les données existantes sont conservées et un avertissement est affiché.
|
|
"""
|
|
|
|
import csv
|
|
import sys
|
|
from datetime import date, time
|
|
from pathlib import Path
|
|
|
|
import sqlalchemy as sa
|
|
|
|
# Ajouter le dossier parent au path pour importer les modules
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from app import create_app, db
|
|
from app.models import WorkEntry, TimeSlot
|
|
from app.config_loader import day_types_without_journey, journey_has_motor
|
|
|
|
|
|
DAY_TYPES = {
|
|
"WORK", "TT", "GARDE", "ASTREINTE", "FORMATION", "RTT", "CONGE", "MALADE", "FERIE"
|
|
}
|
|
|
|
|
|
def main(csv_path: str, config_path: str = None):
|
|
"""Importe les données depuis un fichier CSV vers la base de données."""
|
|
|
|
# Créer l'application Flask avec la config
|
|
app = create_app(config_path=config_path)
|
|
|
|
with app.app_context():
|
|
conflicts = []
|
|
imported_count = 0
|
|
|
|
# Lire le fichier CSV
|
|
with open(csv_path, "r", encoding="utf-8") as f:
|
|
csv_reader = csv.DictReader(f)
|
|
|
|
for row_num, row in enumerate(csv_reader, start=2):
|
|
try:
|
|
# Parser la date
|
|
entry_date = date.fromisoformat(row.get("date", "").strip())
|
|
except (ValueError, AttributeError):
|
|
conflicts.append(f"Ligne {row_num}: date invalide ou manquante")
|
|
continue
|
|
|
|
# Valider le type de jour
|
|
day_type = row.get("day_type", "WORK").strip().upper()
|
|
if day_type not in DAY_TYPES:
|
|
conflicts.append(f"Ligne {row_num}: type de jour invalide '{day_type}'")
|
|
continue
|
|
|
|
# Récupérer les autres champs
|
|
journey_profile_id = row.get("journey_profile_id", "").strip() or None
|
|
motor_vehicle_id = row.get("motor_vehicle_id", "").strip() or None
|
|
comment = row.get("comment", "").strip() or None
|
|
|
|
# Si le type de jour n'a pas de trajet, forcer journey_profile_id à None
|
|
if day_type in day_types_without_journey():
|
|
journey_profile_id = None
|
|
|
|
# Si le profil de trajet n'a pas de moteur, forcer motor_vehicle_id à None
|
|
if journey_profile_id and not journey_has_motor(journey_profile_id):
|
|
motor_vehicle_id = None
|
|
|
|
# Vérifier si une entrée existe déjà pour cette date
|
|
existing = db.session.scalar(
|
|
sa.select(WorkEntry).where(WorkEntry.date == entry_date)
|
|
)
|
|
|
|
if existing:
|
|
# Vérifier si les données sont différentes
|
|
is_different = (
|
|
existing.day_type != day_type or
|
|
existing.journey_profile_id != journey_profile_id or
|
|
existing.motor_vehicle_id != motor_vehicle_id or
|
|
existing.comment != comment
|
|
)
|
|
if is_different:
|
|
conflicts.append(
|
|
f"Ligne {row_num}: conflit sur la date {entry_date}. "
|
|
f"Données existantes conservées."
|
|
)
|
|
continue
|
|
|
|
# Créer la nouvelle entrée
|
|
entry = WorkEntry(
|
|
date=entry_date,
|
|
day_type=day_type,
|
|
journey_profile_id=journey_profile_id,
|
|
motor_vehicle_id=motor_vehicle_id,
|
|
comment=comment,
|
|
)
|
|
db.session.add(entry)
|
|
|
|
# Ajouter les plages horaires
|
|
start_times = row.get("start_time", "").split(";")
|
|
end_times = row.get("end_time", "").split(";")
|
|
|
|
for s, e in zip(start_times, end_times):
|
|
s = s.strip()
|
|
e = e.strip()
|
|
if s and e:
|
|
try:
|
|
db.session.add(TimeSlot(
|
|
entry=entry,
|
|
start_time=time.fromisoformat(s),
|
|
end_time=time.fromisoformat(e),
|
|
))
|
|
except (ValueError, AttributeError):
|
|
conflicts.append(f"Ligne {row_num}: format d'heure invalide '{s}' ou '{e}'")
|
|
db.session.rollback()
|
|
break
|
|
else:
|
|
imported_count += 1
|
|
continue
|
|
|
|
db.session.rollback()
|
|
break
|
|
|
|
# Valider et commiter
|
|
try:
|
|
db.session.commit()
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
print(f"Erreur lors du commit: {e}", file=sys.stderr)
|
|
return 1
|
|
|
|
# Afficher les résultats
|
|
print(f"Import terminé: {imported_count} entrée(s) importée(s)")
|
|
|
|
if conflicts:
|
|
print(f"\n⚠️ {len(conflicts)} avertissement(s):")
|
|
for conflict in conflicts:
|
|
print(f" - {conflict}")
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Importer un fichier CSV dans la base de données")
|
|
parser.add_argument("csv_file", help="Chemin vers le fichier CSV à importer")
|
|
parser.add_argument("--config", default=None, help="Chemin vers le fichier config.toml (optionnel)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
sys.exit(main(args.csv_file, args.config))
|