diff --git a/python_pkg/cinema_planner/__init__.py b/python_pkg/cinema_planner/__init__.py index a818e25..d3346a9 100644 --- a/python_pkg/cinema_planner/__init__.py +++ b/python_pkg/cinema_planner/__init__.py @@ -1 +1 @@ -# Cinema Planner package initialization +"""Cinema Planner - maximize movies watched in a day.""" diff --git a/python_pkg/cinema_planner/cinema_planner.py b/python_pkg/cinema_planner/cinema_planner.py index 2c4fd46..fb06884 100755 --- a/python_pkg/cinema_planner/cinema_planner.py +++ b/python_pkg/cinema_planner/cinema_planner.py @@ -12,13 +12,25 @@ Usage: ./cinema_planner.py movies.txt # Manual format file """ +from __future__ import annotations + import argparse -from contextlib import redirect_stdout +from contextlib import suppress from dataclasses import dataclass, field +import importlib from io import StringIO +import logging from pathlib import Path import re +import shutil +import subprocess import sys +from typing import TYPE_CHECKING, TextIO + +if TYPE_CHECKING: + import types + +logger = logging.getLogger(__name__) # Default genres to exclude (can be overridden with --all-genres) DEFAULT_EXCLUDED_GENRES = {"horror"} @@ -26,9 +38,30 @@ DEFAULT_EXCLUDED_GENRES = {"horror"} # Ads duration before movie starts (Cinema City shows ~15 min of ads) ADS_DURATION = 15 +# Constants for validation and parsing +_MIN_MANUAL_LINE_PARTS = 3 +_MIN_TITLE_LENGTH = 3 +_DEFAULT_MOVIE_DURATION = 120 +_TITLE_LOOKAHEAD_LINES = 5 +_SEPARATOR_WIDTH = 60 + + +def _try_import(name: str) -> types.ModuleType | None: + """Attempt to import a module, returning None if unavailable.""" + try: + return importlib.import_module(name) + except ImportError: + return None + + +_pdfplumber = _try_import("pdfplumber") +_fitz = _try_import("fitz") + @dataclass class Movie: + """A movie with screening times and metadata.""" + name: str start_times: list[int] duration: int @@ -37,23 +70,26 @@ class Movie: @dataclass class Screening: + """A specific screening of a movie at a particular time.""" + movie: str start: int # minutes from midnight end: int # minutes from midnight - def overlaps(self, other: "Screening", buffer: int = 0) -> bool: - # Account for ADS_DURATION grace period - you can arrive late and still catch the movie - # self ends, other starts: self.end vs other.start + ADS_DURATION (actual content start) - # other ends, self starts: other.end vs self.start + ADS_DURATION + def overlaps(self, other: Screening, buffer: int = 0) -> bool: + """Check if this screening overlaps with another, considering buffer.""" + # Account for ADS_DURATION grace period return not ( self.end + buffer <= other.start + ADS_DURATION or other.end + buffer <= self.start + ADS_DURATION ) def start_str(self) -> str: + """Format start time as HH:MM.""" return f"{self.start // 60:02d}:{self.start % 60:02d}" def end_str(self) -> str: + """Format end time as HH:MM.""" return f"{self.end // 60:02d}:{self.end % 60:02d}" @@ -62,7 +98,8 @@ def parse_time(time_str: str) -> int: time_str = time_str.strip().replace(".", ":") match = re.match(r"(\d{1,2}):(\d{2})", time_str) if not match: - raise ValueError(f"Invalid time format: {time_str}") + msg = f"Invalid time format: {time_str}" + raise ValueError(msg) hours, minutes = int(match.group(1)), int(match.group(2)) return hours * 60 + minutes @@ -99,7 +136,8 @@ def parse_duration(duration_str: str) -> int: if match: return int(match.group(1)) - raise ValueError(f"Invalid duration format: {duration_str}") + msg = f"Invalid duration format: {duration_str}" + raise ValueError(msg) def parse_manual_line(line: str) -> Movie | None: @@ -109,22 +147,57 @@ def parse_manual_line(line: str) -> Movie | None: return None parts = line.split(",") - if len(parts) < 3: - raise ValueError(f"Invalid line format: {line}") + if len(parts) < _MIN_MANUAL_LINE_PARTS: + msg = f"Invalid line format: {line}" + raise ValueError(msg) movie = parts[0].strip() times_str = parts[1].strip() duration_str = ",".join(parts[2:]).strip() - start_times = [] - for time_part in re.split(r"\s+or\s+", times_str, flags=re.IGNORECASE): - start_times.append(parse_time(time_part)) + start_times = [ + parse_time(time_part) + for time_part in re.split(r"\s+or\s+", times_str, flags=re.IGNORECASE) + ] duration = parse_duration(duration_str) return Movie(movie, start_times, duration) +def _try_parse_time(time_str: str) -> int | None: + """Try to parse a time string, returning None on failure.""" + try: + return parse_time(time_str) + except ValueError: + return None + + +def _try_parse_manual_line( + line: str, + error_stream: TextIO | None = None, +) -> Movie | None: + """Try to parse a manual line, writing errors to error_stream.""" + try: + return parse_manual_line(line) + except ValueError as e: + if error_stream is not None: + error_stream.write(f"Warning: {e}\n") + return None + + +def _try_parse_interactive_line(line: str) -> Movie | None: + """Try to parse a line in interactive mode, logging errors.""" + try: + result = parse_manual_line(line) + except ValueError: + logger.exception(" Error parsing input") + return None + if result: + logger.info(" Added: %s", result.name) + return result + + def extract_date_from_html(content: str) -> str | None: """Extract schedule date from Cinema City HTML.""" # Look for date in YYYY-MM-DD format @@ -134,12 +207,18 @@ def extract_date_from_html(content: str) -> str | None: return None -def parse_cinema_city_html(filepath: str) -> tuple[list[Movie], str | None]: - """Parse Cinema City HTML schedule. Returns (movies, date).""" - with open(filepath, encoding="utf-8") as f: +def parse_cinema_city_html( + filepath: str, +) -> tuple[list[Movie], str | None]: + """Parse Cinema City HTML schedule. + + Returns: + Tuple of (movies, date). + """ + with Path(filepath).open(encoding="utf-8") as f: content = f.read() - movies = [] + movies: list[Movie] = [] schedule_date = extract_date_from_html(content) # Split content by movie sections @@ -152,13 +231,16 @@ def parse_cinema_city_html(filepath: str) -> tuple[list[Movie], str | None]: continue movie_name = name_match.group(1).strip() - # Get genres - they appear before the duration, separated by commas - # Pattern: class="mr-sm">Genre1, Genre2 ]*>([^<]+)<\s*span', section) - genres = [] + # Get genres + genre_match = re.search( + r'class="mr-sm"[^>]*>([^<]+)<\s*span', section + ) + genres: list[str] = [] if genre_match: genre_text = genre_match.group(1).strip() - genres = [g.strip() for g in genre_text.split(",") if g.strip()] + genres = [ + g.strip() for g in genre_text.split(",") if g.strip() + ] # Get duration duration_match = re.search(r"(\d+)\s*min", section) @@ -167,20 +249,26 @@ def parse_cinema_city_html(filepath: str) -> tuple[list[Movie], str | None]: duration = int(duration_match.group(1)) # Get screening times - look for time buttons - times = re.findall(r'btn btn-primary btn-lg">\s*(\d{2}:\d{2})\s*<', section) + times = re.findall( + r'btn btn-primary btn-lg">\s*(\d{2}:\d{2})\s*<', section + ) if not times: # Try alternate pattern - times = re.findall(r">\s*(\d{2}:\d{2})\s*\(HTTPS://", section) + times = re.findall( + r">\s*(\d{2}:\d{2})\s*\(HTTPS://", section + ) if times: - start_times = [parse_time(t) for t in times] - # Remove duplicates while preserving order - start_times = list(dict.fromkeys(start_times)) - movies.append(Movie(movie_name, start_times, duration, genres)) + start_times = list(dict.fromkeys( + parse_time(t) for t in times + )) + movies.append( + Movie(movie_name, start_times, duration, genres), + ) # Deduplicate movies (same movie might appear multiple times) - seen = set() - unique_movies = [] + seen: set[str] = set() + unique_movies: list[Movie] = [] for movie in movies: if movie.name not in seen: seen.add(movie.name) @@ -191,149 +279,146 @@ def parse_cinema_city_html(filepath: str) -> tuple[list[Movie], str | None]: def parse_cinema_city_pdf(filepath: str) -> list[Movie]: """Parse Cinema City PDF schedule by extracting text.""" - try: - import pdfplumber - except ImportError: - # Fallback to basic text extraction - return parse_cinema_city_pdf_basic(filepath) + if _pdfplumber is not None: + with _pdfplumber.open(filepath) as pdf: + full_text = "" + for page in pdf.pages: + text = page.extract_text() + if text: + full_text += text + "\n" + return parse_cinema_city_text(full_text) - with pdfplumber.open(filepath) as pdf: - full_text = "" - for page in pdf.pages: - text = page.extract_text() - if text: - full_text += text + "\n" - - return parse_cinema_city_text(full_text) + return _parse_cinema_city_pdf_basic(filepath) -def parse_cinema_city_pdf_basic(filepath: str) -> list[Movie]: +def _parse_cinema_city_pdf_basic(filepath: str) -> list[Movie]: """Basic PDF parsing using PyMuPDF or falling back to subprocess.""" - try: - import fitz # PyMuPDF - - doc = fitz.open(filepath) + if _fitz is not None: + doc = _fitz.open(filepath) full_text = "" for page in doc: full_text += page.get_text() + "\n" doc.close() return parse_cinema_city_text(full_text) - except ImportError: - pass - # Try pdftotext command - import subprocess + pdftotext_path = shutil.which("pdftotext") + if pdftotext_path is None: + _exit_no_pdf_support() try: result = subprocess.run( - ["pdftotext", "-layout", filepath, "-"], + [pdftotext_path, "-layout", filepath, "-"], capture_output=True, text=True, check=True, ) - return parse_cinema_city_text(result.stdout) - except (subprocess.CalledProcessError, FileNotFoundError): - print("Error: Install pdfplumber, PyMuPDF, or poppler-utils for PDF support") - print(" pip install pdfplumber") - print(" pip install pymupdf") - print(" pacman -S poppler") - sys.exit(1) + except subprocess.CalledProcessError: + _exit_no_pdf_support() + + return parse_cinema_city_text(result.stdout) + + +def _exit_no_pdf_support() -> None: + """Log PDF support error and exit.""" + logger.error( + "Install pdfplumber, PyMuPDF, or poppler-utils for PDF support" + ) + logger.error(" pip install pdfplumber") + logger.error(" pip install pymupdf") + logger.error(" pacman -S poppler") + sys.exit(1) def parse_cinema_city_text(text: str) -> list[Movie]: """Parse Cinema City schedule from extracted text.""" - movies = [] + movies: list[Movie] = [] lines = text.split("\n") - current_movie = None - current_duration = None + current_movie: str | None = None + current_duration: int | None = None current_times: list[int] = [] # Patterns for movie titles (all caps, usually) movie_title_pattern = re.compile( r"^([A-ZĄĆĘŁŃÓŚŹŻ][A-ZĄĆĘŁŃÓŚŹŻ0-9\s:,\.\-\!\?\(\)]+)$" ) - - # Known movie indicators duration_pattern = re.compile(r"(\d+)\s*min") time_pattern = re.compile(r"\b(\d{1,2}:\d{2})\b") - i = 0 - while i < len(lines): - line = lines[i].strip() + for i, raw_line in enumerate(lines): + line = raw_line.strip() - # Check if this looks like a movie title - # Cinema City format: MOVIE TITLE on its own line, followed by genre | duration - if movie_title_pattern.match(line) and len(line) > 3: - # Save previous movie if exists + if ( + movie_title_pattern.match(line) + and len(line) > _MIN_TITLE_LENGTH + ): if current_movie and current_times: - movies.append( - Movie( - current_movie, - list(dict.fromkeys(current_times)), - current_duration or 120, - ) - ) + movies.append(Movie( + current_movie, + list(dict.fromkeys(current_times)), + current_duration or _DEFAULT_MOVIE_DURATION, + )) - # Check next lines for duration - current_movie = line.title() # Convert to title case + current_movie = line.title() current_times = [] current_duration = None # Look ahead for duration - for j in range(i + 1, min(i + 5, len(lines))): + end = min(i + _TITLE_LOOKAHEAD_LINES, len(lines)) + for j in range(i + 1, end): dur_match = duration_pattern.search(lines[j]) if dur_match: current_duration = int(dur_match.group(1)) break - # Look for times in current line if current_movie: times_in_line = time_pattern.findall(line) for t in times_in_line: - try: - current_times.append(parse_time(t)) - except ValueError: - pass - - i += 1 + parsed = _try_parse_time(t) + if parsed is not None: + current_times.append(parsed) # Save last movie if current_movie and current_times: - movies.append( - Movie( - current_movie, - list(dict.fromkeys(current_times)), - current_duration or 120, - ) - ) + movies.append(Movie( + current_movie, + list(dict.fromkeys(current_times)), + current_duration or _DEFAULT_MOVIE_DURATION, + )) return movies -def find_best_schedule(movies: list[Movie], buffer: int) -> list[list[Screening]]: +def find_best_schedule( + movies: list[Movie], + buffer: int, +) -> list[list[Screening]]: """Find ALL schedules that maximize number of movies watched.""" - movie_screenings: list[list[Screening]] = [] - for movie in movies: - # Schedule times are accurate - arrive at start, leave at start + duration - # (ads are already factored into published times) - screenings = [ + movie_screenings: list[list[Screening]] = [ + [ Screening(movie.name, start, start + movie.duration) for start in movie.start_times ] - movie_screenings.append(screenings) + for movie in movies + ] best_count = 0 all_best_schedules: list[list[Screening]] = [] - def backtrack(movie_idx: int, current_schedule: list[Screening]): + def _backtrack( + movie_idx: int, + current_schedule: list[Screening], + ) -> None: nonlocal best_count, all_best_schedules if movie_idx == len(movie_screenings): if len(current_schedule) > best_count: best_count = len(current_schedule) all_best_schedules = [current_schedule.copy()] - elif len(current_schedule) == best_count and best_count > 0: + elif ( + len(current_schedule) == best_count + and best_count > 0 + ): all_best_schedules.append(current_schedule.copy()) return @@ -344,108 +429,155 @@ def find_best_schedule(movies: list[Movie], buffer: int) -> list[list[Screening] # Try each screening of current movie for screening in movie_screenings[movie_idx]: - conflicts = any(screening.overlaps(s, buffer) for s in current_schedule) + conflicts = any( + screening.overlaps(s, buffer) + for s in current_schedule + ) if not conflicts: current_schedule.append(screening) - backtrack(movie_idx + 1, current_schedule) + _backtrack(movie_idx + 1, current_schedule) current_schedule.pop() # Also try skipping this movie - backtrack(movie_idx + 1, current_schedule) + _backtrack(movie_idx + 1, current_schedule) - backtrack(0, []) + _backtrack(0, []) # Sort each schedule by start time and return - return [sorted(schedule, key=lambda s: s.start) for schedule in all_best_schedules] + return [ + sorted(schedule, key=lambda s: s.start) + for schedule in all_best_schedules + ] -def print_single_schedule(schedule: list[Screening], schedule_num: int | None = None): - """Print a single schedule.""" +def _format_single_schedule( + schedule: list[Screening], + output: TextIO, +) -> None: + """Format a single schedule to the output stream.""" for i, screening in enumerate(schedule, 1): duration = screening.end - screening.start hours, mins = divmod(duration, 60) - # Movie starts ~15 min after listed time due to ads actual_start = screening.start + ADS_DURATION - actual_start_str = f"{actual_start // 60:02d}:{actual_start % 60:02d}" - print( - f" {i}. {screening.start_str()} - {screening.end_str()} {screening.movie}" + actual_start_str = ( + f"{actual_start // 60:02d}:{actual_start % 60:02d}" + ) + output.write( + f" {i}. {screening.start_str()} - " + f"{screening.end_str()} {screening.movie}\n" + ) + output.write( + f" Duration: {hours}h {mins}m " + f"(movie starts ~{actual_start_str})\n" ) - print(f" Duration: {hours}h {mins}m (movie starts ~{actual_start_str})") if i < len(schedule): gap = schedule[i].start - screening.end if gap > 0: - print(f" [{gap} min break]") - print() + output.write(f" [{gap} min break]\n") + output.write("\n") -def print_schedules( +def _format_schedules( schedules: list[list[Screening]], all_movies: list[str], date: str | None = None, max_display: int = 5, -): - """Print optimal schedules (up to max_display).""" + *, + output: TextIO | None = None, +) -> None: + """Format optimal schedules to the output stream.""" + if output is None: + output = sys.stdout + + sep = "=" * _SEPARATOR_WIDTH + thin_sep = "\u2500" * _SEPARATOR_WIDTH + if not schedules or not schedules[0]: - print("No movies can be scheduled!") + output.write("No movies can be scheduled!\n") return num_movies = len(schedules[0]) num_schedules = len(schedules) - print(f"\n{'=' * 60}") + output.write(f"\n{sep}\n") if date: - print(f" OPTIMAL CINEMA SCHEDULES - {date}") + output.write(f" OPTIMAL CINEMA SCHEDULES - {date}\n") else: - print(" OPTIMAL CINEMA SCHEDULES") - print(f" {num_movies} movies, {num_schedules} possible combination(s)") - print(f"{'=' * 60}\n") + output.write(" OPTIMAL CINEMA SCHEDULES\n") + output.write( + f" {num_movies} movies, " + f"{num_schedules} possible combination(s)\n" + ) + output.write(f"{sep}\n\n") display_count = min(num_schedules, max_display) for idx, schedule in enumerate(schedules[:display_count], 1): if num_schedules > 1: - print(f"{'─' * 60}") - print(f" OPTION {idx}:") - print(f"{'─' * 60}\n") - print_single_schedule(schedule) + output.write(f"{thin_sep}\n") + output.write(f" OPTION {idx}:\n") + output.write(f"{thin_sep}\n\n") + _format_single_schedule(schedule, output) if num_schedules > display_count: - print(f"{'─' * 60}") - print(f" ... and {num_schedules - display_count} more combinations") - print(" (use -n to show more, e.g., -n 10)") - print() + output.write(f"{thin_sep}\n") + output.write( + f" ... and {num_schedules - display_count} " + "more combinations\n" + ) + output.write(" (use -n to show more, e.g., -n 10)\n") + output.write("\n") # Show skipped movies (from first schedule as reference) scheduled_movies = {s.movie for s in schedules[0]} skipped = [m for m in all_movies if m not in scheduled_movies] if skipped and num_schedules == 1: - print(f"{'─' * 60}") - print(f" Skipped movies ({len(skipped)}):") + output.write(f"{thin_sep}\n") + output.write(f" Skipped movies ({len(skipped)}):\n") for movie in skipped: - print(f" - {movie}") - print() + output.write(f" - {movie}\n") + output.write("\n") -def print_all_movies(movies: list[Movie], date: str | None = None): - """Print all parsed movies.""" - print(f"\n{'─' * 60}") +def _format_all_movies( + movies: list[Movie], + date: str | None = None, + *, + output: TextIO | None = None, +) -> None: + """Format all parsed movies to the output stream.""" + if output is None: + output = sys.stdout + + thin_sep = "\u2500" * _SEPARATOR_WIDTH + + output.write(f"\n{thin_sep}\n") if date: - print(f" Parsed {len(movies)} movies for {date}:") + output.write(f" Parsed {len(movies)} movies for {date}:\n") else: - print(f" Parsed {len(movies)} movies:") - print(f"{'─' * 60}") + output.write(f" Parsed {len(movies)} movies:\n") + output.write(f"{thin_sep}\n") for movie in movies: times_str = ", ".join( - f"{t//60:02d}:{t%60:02d}" for t in sorted(movie.start_times) + f"{t // 60:02d}:{t % 60:02d}" + for t in sorted(movie.start_times) ) - genre_str = f" [{', '.join(movie.genres)}]" if movie.genres else "" - print(f" {movie.name} ({movie.duration} min){genre_str}") - print(f" Times: {times_str}") - print() + genre_str = ( + f" [{', '.join(movie.genres)}]" if movie.genres else "" + ) + output.write( + f" {movie.name} ({movie.duration} min){genre_str}\n" + ) + output.write(f" Times: {times_str}\n") + output.write("\n") -def main(): +def _build_parser() -> argparse.ArgumentParser: + """Build the argument parser for the cinema planner.""" parser = argparse.ArgumentParser( - description="Plan your cinema day to watch as many movies as possible.", + description=( + "Plan your cinema day to watch " + "as many movies as possible." + ), formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Supports Cinema City HTML/PDF schedules (auto-detected). @@ -458,7 +590,9 @@ Example: The Matrix, 12:00 or 16:45, 2h 16m """, ) - parser.add_argument("input_file", nargs="?", help="Input file (HTML/PDF/TXT)") + parser.add_argument( + "input_file", nargs="?", help="Input file (HTML/PDF/TXT)" + ) parser.add_argument( "-b", "--buffer", @@ -482,19 +616,19 @@ Example: "-s", "--select", type=str, - help="Comma-separated list of movie names to include (partial match)", + help="Comma-separated movie names to include (partial match)", ) parser.add_argument( "-x", "--exclude", type=str, - help="Comma-separated list of movie names to exclude (partial match)", + help="Comma-separated movie names to exclude (partial match)", ) parser.add_argument( "-g", "--exclude-genre", type=str, - help="Comma-separated list of genres to exclude (e.g., 'Horror,Thriller')", + help="Comma-separated genres to exclude (e.g., 'Horror')", ) parser.add_argument( "--all-genres", @@ -512,7 +646,7 @@ Example: "--max-schedules", type=int, default=5, - help="Maximum number of schedule options to display (default: 5)", + help="Max schedule options to display (default: 5)", ) parser.add_argument( "-m", @@ -520,146 +654,239 @@ Example: type=str, help="Only show schedules containing this movie (partial match)", ) + return parser - args = parser.parse_args() - movies = [] - schedule_date = None - - if args.interactive: - print("Enter movies (empty line to finish):") - print("Format: Title, start1 [or start2 ...], duration") - print("Example: Inception, 10:30 or 14:00, 2h 28m") - print() +def _load_movies_interactive() -> list[Movie]: + """Load movies through interactive terminal input.""" + logger.info("Enter movies (empty line to finish):") + logger.info("Format: Title, start1 [or start2 ...], duration") + logger.info("Example: Inception, 10:30 or 14:00, 2h 28m") + logger.info("") + movies: list[Movie] = [] + with suppress(EOFError): while True: - try: - line = input("> ") - except EOFError: - break + line = input("> ") if not line.strip(): break - try: - result = parse_manual_line(line) - if result: - movies.append(result) - print(f" Added: {result.name}") - except ValueError as e: - print(f" Error: {e}") - elif args.input_file: - filepath = Path(args.input_file) - suffix = filepath.suffix.lower() + result = _try_parse_interactive_line(line) + if result: + movies.append(result) + return movies - print(f"Parsing: {filepath}") - if suffix == ".html" or suffix == ".htm": - movies, schedule_date = parse_cinema_city_html(str(filepath)) - elif suffix == ".pdf": - movies = parse_cinema_city_pdf(str(filepath)) - else: - # Assume manual format - with open(filepath) as f: - for line in f: - try: - result = parse_manual_line(line) - if result: - movies.append(result) - except ValueError as e: - print(f"Warning: {e}", file=sys.stderr) - else: - print("Enter movies (Ctrl+D when done):") - for line in sys.stdin: - try: - result = parse_manual_line(line) - if result: - movies.append(result) - except ValueError as e: - print(f"Warning: {e}", file=sys.stderr) +def _load_movies_from_file( + filepath: Path, +) -> tuple[list[Movie], str | None]: + """Load movies from a file (HTML, PDF, or manual format).""" + suffix = filepath.suffix.lower() + logger.info("Parsing: %s", filepath) - if not movies: - print("No movies found!") - sys.exit(1) + if suffix in {".html", ".htm"}: + return parse_cinema_city_html(str(filepath)) - # Filter movies if requested + if suffix == ".pdf": + return parse_cinema_city_pdf(str(filepath)), None + + movies: list[Movie] = [] + with filepath.open() as f: + for line in f: + result = _try_parse_manual_line(line, sys.stderr) + if result: + movies.append(result) + return movies, None + + +def _load_movies_from_stdin() -> list[Movie]: + """Load movies from standard input.""" + logger.info("Enter movies (Ctrl+D when done):") + movies: list[Movie] = [] + for line in sys.stdin: + result = _try_parse_manual_line(line, sys.stderr) + if result: + movies.append(result) + return movies + + +def _filter_movies( + movies: list[Movie], + args: argparse.Namespace, +) -> tuple[list[Movie], set[str]]: + """Apply name and genre filters to movies.""" if args.select: - select_terms = [t.strip().lower() for t in args.select.split(",")] - movies = [m for m in movies if any(t in m.name.lower() for t in select_terms)] - print(f"Selected {len(movies)} movies matching: {args.select}") + select_terms = [ + t.strip().lower() for t in args.select.split(",") + ] + movies = [ + m + for m in movies + if any(t in m.name.lower() for t in select_terms) + ] + logger.info( + "Selected %d movies matching: %s", + len(movies), + args.select, + ) if args.exclude: - exclude_terms = [t.strip().lower() for t in args.exclude.split(",")] - movies = [ - m for m in movies if not any(t in m.name.lower() for t in exclude_terms) + exclude_terms = [ + t.strip().lower() for t in args.exclude.split(",") ] - print(f"After name exclusion: {len(movies)} movies") + movies = [ + m + for m in movies + if not any(t in m.name.lower() for t in exclude_terms) + ] + logger.info("After name exclusion: %d movies", len(movies)) - # Genre filtering - excluded_genres = set() + excluded_genres: set[str] = set() if not args.all_genres: excluded_genres.update(DEFAULT_EXCLUDED_GENRES) if args.exclude_genre: - excluded_genres.update(g.strip().lower() for g in args.exclude_genre.split(",")) + excluded_genres.update( + g.strip().lower() for g in args.exclude_genre.split(",") + ) if excluded_genres: before_count = len(movies) movies = [ - m for m in movies if not any(g.lower() in excluded_genres for g in m.genres) + m + for m in movies + if not any( + g.lower() in excluded_genres for g in m.genres + ) ] filtered_count = before_count - len(movies) if filtered_count > 0: - print( - f"Excluded {filtered_count} movies by genre: {', '.join(sorted(excluded_genres))}" + logger.info( + "Excluded %d movies by genre: %s", + filtered_count, + ", ".join(sorted(excluded_genres)), ) + return movies, excluded_genres + + +def _apply_must_watch_filter( + schedules: list[list[Screening]], + must_watch: str, +) -> list[list[Screening]]: + """Filter schedules to only those containing must-watch movie.""" + must_watch_lower = must_watch.lower() + filtered = [ + s + for s in schedules + if any( + must_watch_lower in screening.movie.lower() + for screening in s + ) + ] + if filtered: + logger.info( + "Filtered to %d schedules containing '%s'", + len(filtered), + must_watch, + ) + return filtered + + logger.warning( + "No optimal schedules contain '%s'", must_watch + ) + logger.warning("Showing all schedules instead.") + return schedules + + +def _output_schedules( + schedules: list[list[Screening]], + all_movie_names: list[str], + schedule_date: str | None, + args: argparse.Namespace, + excluded_genres: set[str], +) -> None: + """Handle schedule output, optionally saving to file.""" + output_buffer = StringIO() + _format_schedules( + schedules, + all_movie_names, + schedule_date, + args.max_schedules, + output=output_buffer, + ) + schedule_output = output_buffer.getvalue() + sys.stdout.write(schedule_output) + + if args.output or schedule_date: + output_file = ( + Path(args.output) + if args.output + else Path(f"cinema_plan_{schedule_date}.txt") + ) + with output_file.open("w") as f: + f.write( + f"Generated: {schedule_date or 'unknown date'}\n" + ) + f.write(f"Movies considered: {len(all_movie_names)}\n") + f.write(f"Buffer time: {args.buffer} minutes\n") + if excluded_genres: + f.write( + "Excluded genres: " + f"{', '.join(sorted(excluded_genres))}\n" + ) + f.write(schedule_output) + logger.info("Schedule saved to: %s", output_file) + + +def main() -> None: + """Run the cinema day planner CLI.""" + logging.basicConfig(format="%(message)s", level=logging.INFO) + + parser = _build_parser() + args = parser.parse_args() + + movies: list[Movie] = [] + schedule_date: str | None = None + + if args.interactive: + movies = _load_movies_interactive() + elif args.input_file: + movies, schedule_date = _load_movies_from_file( + Path(args.input_file), + ) + else: + movies = _load_movies_from_stdin() + + if not movies: + logger.error("No movies found!") + sys.exit(1) + + movies, excluded_genres = _filter_movies(movies, args) + if args.list: - print_all_movies(movies, schedule_date) + _format_all_movies(movies, schedule_date) return - print(f"\nOptimizing schedule for {len(movies)} movies...") - print(f"Buffer time between movies: {args.buffer} minutes") + logger.info( + "\nOptimizing schedule for %d movies...", len(movies) + ) + logger.info( + "Buffer time between movies: %d minutes", args.buffer + ) schedules = find_best_schedule(movies, args.buffer) all_movie_names = [m.name for m in movies] - # Filter schedules if must-watch movie specified if args.must_watch: - must_watch_lower = args.must_watch.lower() - filtered = [ - s - for s in schedules - if any(must_watch_lower in screening.movie.lower() for screening in s) - ] - if filtered: - print( - f"Filtered to {len(filtered)} schedules containing '{args.must_watch}'" - ) - schedules = filtered - else: - print(f"Warning: No optimal schedules contain '{args.must_watch}'") - print("Showing all schedules instead.") + schedules = _apply_must_watch_filter( + schedules, args.must_watch + ) - # Capture output if saving to file - output_buffer = StringIO() - with redirect_stdout(output_buffer): - print_schedules(schedules, all_movie_names, schedule_date, args.max_schedules) - - schedule_output = output_buffer.getvalue() - print(schedule_output) # Still show in terminal - - # Save to file - if args.output or schedule_date: - if args.output: - output_file = Path(args.output) - else: - output_file = Path(f"cinema_plan_{schedule_date}.txt") - - with open(output_file, "w") as f: - f.write(f"Generated: {schedule_date or 'unknown date'}\n") - f.write(f"Movies considered: {len(movies)}\n") - f.write(f"Buffer time: {args.buffer} minutes\n") - if excluded_genres: - f.write(f"Excluded genres: {', '.join(sorted(excluded_genres))}\n") - f.write(schedule_output) - print(f"Schedule saved to: {output_file}") + _output_schedules( + schedules, + all_movie_names, + schedule_date, + args, + excluded_genres, + ) if __name__ == "__main__":