diff --git a/python_pkg/word_frequency/analyzer.py b/python_pkg/word_frequency/analyzer.py index 5cdf807..b20aa06 100755 --- a/python_pkg/word_frequency/analyzer.py +++ b/python_pkg/word_frequency/analyzer.py @@ -22,11 +22,14 @@ from __future__ import annotations import argparse from collections import Counter +import logging from pathlib import Path import re import sys from typing import TYPE_CHECKING +logger = logging.getLogger(__name__) + if TYPE_CHECKING: from collections.abc import Sequence @@ -90,9 +93,7 @@ def read_files(filepaths: Sequence[str | Path]) -> str: Returns: Combined text content of all files. """ - texts = [] - for filepath in filepaths: - texts.append(read_file(filepath)) + texts = [read_file(filepath) for filepath in filepaths] return "\n".join(texts) @@ -244,15 +245,15 @@ def main(argv: Sequence[str] | None = None) -> int: if args.output: Path(args.output).write_text(result, encoding="utf-8") - print(f"Output written to {args.output}") + logger.info("Output written to %s", args.output) else: - print(result) + sys.stdout.write(result + "\n") - except FileNotFoundError as e: - print(f"Error: File not found - {e}", file=sys.stderr) + except FileNotFoundError: + logger.exception("File not found") return 1 - except UnicodeDecodeError as e: - print(f"Error: Could not decode file as UTF-8 - {e}", file=sys.stderr) + except UnicodeDecodeError: + logger.exception("Could not decode file as UTF-8") return 1 return 0 diff --git a/python_pkg/word_frequency/anki_generator.py b/python_pkg/word_frequency/anki_generator.py index dced133..7251c47 100755 --- a/python_pkg/word_frequency/anki_generator.py +++ b/python_pkg/word_frequency/anki_generator.py @@ -4,27 +4,35 @@ Generates Anki-compatible flashcard decks from the vocabulary needed to understand excerpts of a given length. -Usage: +Usage:: + # Generate flashcards for a 20-word excerpt - python -m python_pkg.word_frequency.anki_generator --file text.txt --length 20 + python -m python_pkg.word_frequency.anki_generator \ + --file text.txt --length 20 # Specify source language (auto-detected by default) - python -m python_pkg.word_frequency.anki_generator --file text.txt --length 20 --from pl + python -m python_pkg.word_frequency.anki_generator \ + --file text.txt --length 20 --from pl # Custom output file - python -m python_pkg.word_frequency.anki_generator --file text.txt --length 20 --output polish_vocab.txt + python -m python_pkg.word_frequency.anki_generator \ + --file text.txt --length 20 --output polish_vocab.txt # Include example sentences/context - python -m python_pkg.word_frequency.anki_generator --file text.txt --length 20 --include-context + python -m python_pkg.word_frequency.anki_generator \ + --file text.txt --length 20 --include-context Output: - Creates a semicolon-separated text file that can be imported into Anki. - Format: word;translation;frequency_rank;example_context (optional) + Creates a semicolon-separated text file importable into Anki. + Format: ``word;translation;frequency_rank;example_context`` """ from __future__ import annotations import argparse +import contextlib +from dataclasses import dataclass +import logging from pathlib import Path import re import subprocess @@ -36,14 +44,58 @@ if TYPE_CHECKING: try: from python_pkg.word_frequency.analyzer import read_file + from python_pkg.word_frequency.cache import ( + AnkiDeckKey, + clear_all_caches, + get_all_cache_stats, + get_anki_deck_cache, + get_vocab_curve_cache, + ) from python_pkg.word_frequency.translator import ( detect_language, translate_words_batch, ) except ImportError: from analyzer import read_file + from cache import ( + AnkiDeckKey, + clear_all_caches, + get_all_cache_stats, + get_anki_deck_cache, + get_vocab_curve_cache, + ) from translator import detect_language, translate_words_batch +logger = logging.getLogger(__name__) + +_MIN_VOCAB_DUMP_PARTS = 2 +_MIN_EXCERPT_PARTS = 3 +_ONE_KB = 1024 +_ONE_MB = 1024 * 1024 + + +@dataclass(frozen=True) +class FlashcardOptions: + """Options for flashcard generation.""" + + source_lang: str | None = None + target_lang: str = "en" + deck_name: str | None = None + include_context: bool = False + no_translate: bool = False + force: bool = False + + +@dataclass(frozen=True) +class DeckInput: + """Input data for Anki deck generation.""" + + words_with_ranks: list[tuple[str, int]] + source_lang: str + target_lang: str = "en" + contexts: dict[str, str] | None = None + deck_name: str = "Vocabulary" + # Path to C vocabulary_curve executable C_EXECUTABLE = ( @@ -78,10 +130,11 @@ def run_vocabulary_curve( subprocess.CalledProcessError: If execution fails. """ if not C_EXECUTABLE.exists(): - raise FileNotFoundError( + msg = ( f"C executable not found at {C_EXECUTABLE}. " "Please compile it first: cd C/vocabulary_curve && make" ) + raise FileNotFoundError(msg) cmd = [str(C_EXECUTABLE), str(filepath), str(max_length)] if dump_vocab: @@ -115,10 +168,11 @@ def run_vocabulary_curve_inverse( subprocess.CalledProcessError: If execution fails. """ if not C_EXECUTABLE.exists(): - raise FileNotFoundError( + msg = ( f"C executable not found at {C_EXECUTABLE}. " "Please compile it first: cd C/vocabulary_curve && make" ) + raise FileNotFoundError(msg) cmd = [str(C_EXECUTABLE), str(filepath), "--max-vocab", str(max_vocab)] if dump_vocab: @@ -134,6 +188,57 @@ def run_vocabulary_curve_inverse( return result.stdout +def _parse_vocab_dump(lines: list[str]) -> list[tuple[str, int]]: + """Parse VOCAB_DUMP section from output lines. + + Args: + lines: Output lines from vocabulary_curve. + + Returns: + List of (word, rank) tuples. + """ + all_vocab: list[tuple[str, int]] = [] + in_vocab_dump = False + for line in lines: + stripped = line.strip() + if stripped == "VOCAB_DUMP_START": + in_vocab_dump = True + continue + if stripped == "VOCAB_DUMP_END": + break + if in_vocab_dump and ";" in stripped: + parts = stripped.split(";") + if len(parts) == _MIN_VOCAB_DUMP_PARTS: + word, rank_str = parts + with contextlib.suppress(ValueError): + all_vocab.append((word, int(rank_str))) + return all_vocab + + +def _parse_excerpt_lines(lines: list[str], start: int) -> str: + """Parse excerpt text from output lines starting after 'Excerpt:'. + + Args: + lines: Output lines. + start: Index of the line after 'Excerpt:'. + + Returns: + Joined excerpt text. + """ + excerpt_parts: list[str] = [] + idx = start + while idx < len(lines): + next_line = lines[idx].strip() + next_line = next_line.removeprefix('"') + if next_line.endswith('"'): + next_line = next_line[:-1] + excerpt_parts.append(next_line) + break + excerpt_parts.append(next_line) + idx += 1 + return " ".join(excerpt_parts) + + def parse_inverse_mode_output( output: str, ) -> tuple[str, int, int, list[tuple[str, int]]]: @@ -149,58 +254,77 @@ def parse_inverse_mode_output( excerpt = "" excerpt_length = 0 max_rank_used = 0 - all_vocab: list[tuple[str, int]] = [] - for i, line in enumerate(lines): - line = line.strip() + for i, raw_line in enumerate(lines): + line = raw_line.strip() if line.startswith("LONGEST EXCERPT:"): parts = line.split() - if len(parts) >= 3: + if len(parts) >= _MIN_EXCERPT_PARTS: excerpt_length = int(parts[2]) elif line.startswith("Excerpt:"): - # Next line(s) contain the excerpt - i += 1 - excerpt_parts = [] - while i < len(lines): - next_line = lines[i].strip() - if next_line.startswith('"'): - next_line = next_line[1:] - if next_line.endswith('"'): - next_line = next_line[:-1] - excerpt_parts.append(next_line) - break - excerpt_parts.append(next_line) - i += 1 - excerpt = " ".join(excerpt_parts) + excerpt = _parse_excerpt_lines(lines, i + 1) elif line.startswith("Rarest word used:"): - # Parse "word (#rank)" match = re.search(r"\(#(\d+)\)", line) if match: max_rank_used = int(match.group(1)) - # Parse VOCAB_DUMP section if present - in_vocab_dump = False - for line in lines: - if line.strip() == "VOCAB_DUMP_START": - in_vocab_dump = True - continue - if line.strip() == "VOCAB_DUMP_END": - break - if in_vocab_dump and ";" in line: - parts = line.strip().split(";") - if len(parts) == 2: - word, rank_str = parts - try: - all_vocab.append((word, int(rank_str))) - except ValueError: - pass - + all_vocab = _parse_vocab_dump(lines) return excerpt, excerpt_length, max_rank_used, all_vocab +def _parse_target_length_block( + lines: list[str], + target_length: int, +) -> tuple[str, list[tuple[str, int]]]: + """Parse the [Length N] block from vocabulary curve output. + + Args: + lines: Output lines. + target_length: Target excerpt length to find. + + Returns: + Tuple of (excerpt, excerpt_words). + """ + excerpt = "" + excerpt_words: list[tuple[str, int]] = [] + i = 0 + while i < len(lines): + if lines[i].strip().startswith(f"[Length {target_length}]"): + i += 1 + # Find excerpt line + while i < len(lines) and not lines[i].strip().startswith( + "Excerpt:" + ): + i += 1 + if i < len(lines): + excerpt_line = lines[i].strip() + if '"' in excerpt_line: + start = excerpt_line.index('"') + 1 + end = excerpt_line.rindex('"') + excerpt = excerpt_line[start:end] + # Find words line + i += 1 + while i < len(lines) and not lines[i].strip().startswith( + "Words:" + ): + i += 1 + if i < len(lines): + words_line = lines[i].strip() + if words_line.startswith("Words:"): + words_part = words_line[6:].strip() + pattern = r"(\S+)\(#(\d+)\)" + matches = re.findall(pattern, words_part) + excerpt_words = [ + (w, int(r)) for w, r in matches + ] + break + i += 1 + return excerpt, excerpt_words + + def parse_vocabulary_curve_output( output: str, target_length: int ) -> tuple[str, list[tuple[str, int]], list[tuple[str, int]]]: @@ -213,61 +337,15 @@ def parse_vocabulary_curve_output( Returns: Tuple of (excerpt_text, excerpt_words, all_vocab_words). excerpt_words: words in the excerpt with their ranks. - all_vocab_words: all words up to max rank (from VOCAB_DUMP if present). + all_vocab_words: all words up to max rank + (from VOCAB_DUMP if present). """ lines = output.split("\n") - excerpt = "" - excerpt_words: list[tuple[str, int]] = [] - all_vocab: list[tuple[str, int]] = [] - # Find the line for the target length - i = 0 - while i < len(lines): - line = lines[i] - if line.strip().startswith(f"[Length {target_length}]"): - # Found our target length, now get excerpt and words - i += 1 - # Find excerpt line - while i < len(lines) and not lines[i].strip().startswith("Excerpt:"): - i += 1 - if i < len(lines): - excerpt_line = lines[i].strip() - if '"' in excerpt_line: - start = excerpt_line.index('"') + 1 - end = excerpt_line.rindex('"') - excerpt = excerpt_line[start:end] - - # Find words line - i += 1 - while i < len(lines) and not lines[i].strip().startswith("Words:"): - i += 1 - if i < len(lines): - words_line = lines[i].strip() - if words_line.startswith("Words:"): - words_part = words_line[6:].strip() - # Parse "word(#rank), word2(#rank2), ..." - pattern = r"(\S+)\(#(\d+)\)" - matches = re.findall(pattern, words_part) - excerpt_words = [(w, int(r)) for w, r in matches] - break - i += 1 - - # Parse VOCAB_DUMP section if present - in_vocab_dump = False - for line in lines: - if line.strip() == "VOCAB_DUMP_START": - in_vocab_dump = True - continue - if line.strip() == "VOCAB_DUMP_END": - break - if in_vocab_dump and ";" in line: - parts = line.strip().split(";") - if len(parts) == 2: - word, rank_str = parts - try: - all_vocab.append((word, int(rank_str))) - except ValueError: - pass + excerpt, excerpt_words = _parse_target_length_block( + lines, target_length + ) + all_vocab = _parse_vocab_dump(lines) return excerpt, excerpt_words, all_vocab @@ -307,12 +385,86 @@ def find_word_contexts( return contexts -def generate_anki_deck( +def _format_excerpt_card( + excerpt: str, + excerpt_words: list[tuple[str, int]] | None, +) -> str: + """Format the excerpt as the first Anki card. + + Args: + excerpt: The target excerpt text. + excerpt_words: Words in the excerpt with ranks. + + Returns: + Formatted excerpt card line. + """ + excerpt_escaped = excerpt.replace(";", ",") + if excerpt_words: + most_frequent = min(excerpt_words, key=lambda x: x[1])[0] + rarest = max(excerpt_words, key=lambda x: x[1])[0] + if most_frequent != rarest: + pattern_rare = re.compile( + rf"\b({re.escape(rarest)})\b", re.IGNORECASE + ) + excerpt_escaped = pattern_rare.sub( + r"\1", excerpt_escaped + ) + pattern_freq = re.compile( + rf"\b({re.escape(most_frequent)})\b", + re.IGNORECASE, + ) + excerpt_escaped = pattern_freq.sub( + r"\1", excerpt_escaped + ) + else: + pattern = re.compile( + rf"\b({re.escape(most_frequent)})\b", + re.IGNORECASE, + ) + excerpt_escaped = pattern.sub( + r"\1", excerpt_escaped + ) + return f"\U0001f4d6 TARGET EXCERPT;{excerpt_escaped};#0" + + +def _build_translation_lookup( words_with_ranks: list[tuple[str, int]], source_lang: str, - target_lang: str = "en", - contexts: dict[str, str] | None = None, - deck_name: str = "Vocabulary", + target_lang: str, + *, + no_translate: bool = False, +) -> dict[str, str]: + """Build word-to-translation lookup dict. + + Args: + words_with_ranks: List of (word, rank) tuples. + source_lang: Source language code. + target_lang: Target language code. + no_translate: If True, use placeholder translations. + + Returns: + Dict mapping lowercase word to translation. + """ + words = [w for w, _ in words_with_ranks] + if no_translate: + return {w.lower(): "[TODO]" for w in words} + translations = translate_words_batch(words, source_lang, target_lang) + trans_lookup: dict[str, str] = {} + for result in translations: + if result.success: + trans_lookup[result.source_word.lower()] = ( + result.translated_word + ) + else: + trans_lookup[result.source_word.lower()] = ( + f"[{result.source_word}]" + ) + return trans_lookup + + +def generate_anki_deck( + deck_input: DeckInput, + *, include_context: bool = False, no_translate: bool = False, excerpt: str = "", @@ -321,15 +473,11 @@ def generate_anki_deck( """Generate Anki-compatible deck content. Args: - words_with_ranks: List of (word, rank) tuples. - source_lang: Source language code. - target_lang: Target language code (default: en). - contexts: Optional dict of word -> context. - deck_name: Name for the deck. + deck_input: Core deck data (words, langs, contexts, name). include_context: Whether to include context in cards. no_translate: If True, skip translation (use placeholder). excerpt: The target excerpt text to include in cards. - excerpt_words: List of (word, rank) tuples for words in the excerpt. + excerpt_words: Words in the excerpt with ranks. Returns: Semicolon-separated content ready for Anki import. @@ -339,73 +487,45 @@ def generate_anki_deck( # Add Anki headers lines.append("#separator:semicolon") lines.append("#html:true") - lines.append(f"#deck:{deck_name}") - lines.append(f"#tags:vocabulary {source_lang}") + lines.append(f"#deck:{deck_input.deck_name}") + lines.append(f"#tags:vocabulary {deck_input.source_lang}") if include_context: lines.append("#columns:Front;Back;Rank;Context") else: lines.append("#columns:Front;Back;Rank") lines.append("") # Empty line before data - # Add excerpt as first card (goal/context card) if excerpt: - excerpt_escaped = excerpt.replace(";", ",") - # Use excerpt_words from C output (has correct ranks) - if excerpt_words: - # Most frequent = lowest rank (italics), rarest = highest rank (bold) - most_frequent = min(excerpt_words, key=lambda x: x[1])[0] - rarest = max(excerpt_words, key=lambda x: x[1])[0] - # Apply formatting - rarest first (bold), then most frequent (italics) - # to avoid nested tag issues if they're the same word - if most_frequent != rarest: - pattern_rare = re.compile(rf"\b({re.escape(rarest)})\b", re.IGNORECASE) - excerpt_escaped = pattern_rare.sub(r"\1", excerpt_escaped) - pattern_freq = re.compile( - rf"\b({re.escape(most_frequent)})\b", re.IGNORECASE - ) - excerpt_escaped = pattern_freq.sub(r"\1", excerpt_escaped) - else: - # Same word is both most and least frequent - use bold+italic - pattern = re.compile( - rf"\b({re.escape(most_frequent)})\b", re.IGNORECASE - ) - excerpt_escaped = pattern.sub(r"\1", excerpt_escaped) - lines.append(f"📖 TARGET EXCERPT;{excerpt_escaped};#0") + lines.append(_format_excerpt_card(excerpt, excerpt_words)) - # Get translations (or skip if no_translate) - words = [w for w, _ in words_with_ranks] - if no_translate: - trans_lookup = {w.lower(): "[TODO]" for w in words} - else: - translations = translate_words_batch(words, source_lang, target_lang) - # Build translation lookup - trans_lookup = {} - for result in translations: - if result.success: - trans_lookup[result.source_word.lower()] = result.translated_word - else: - trans_lookup[result.source_word.lower()] = f"[{result.source_word}]" + trans_lookup = _build_translation_lookup( + deck_input.words_with_ranks, + deck_input.source_lang, + deck_input.target_lang, + no_translate=no_translate, + ) # Generate cards - for word, rank in words_with_ranks: + for word, rank in deck_input.words_with_ranks: translation = trans_lookup.get(word.lower(), f"[{word}]") # Escape semicolons in fields word_escaped = word.replace(";", ",") translation_escaped = translation.replace(";", ",") - if include_context and contexts: - context = contexts.get(word.lower(), "") - # Highlight the word in context + if include_context and deck_input.contexts: + context = deck_input.contexts.get(word.lower(), "") if context: context_escaped = context.replace(";", ",") - # Make target word bold in context pattern = re.compile(re.escape(word), re.IGNORECASE) - context_escaped = pattern.sub(f"{word}", context_escaped) + context_escaped = pattern.sub( + f"{word}", context_escaped + ) else: context_escaped = "" lines.append( - f"{word_escaped};{translation_escaped};#{rank};{context_escaped}" + f"{word_escaped};{translation_escaped}" + f";#{rank};{context_escaped}" ) else: lines.append(f"{word_escaped};{translation_escaped};#{rank}") @@ -428,12 +548,7 @@ def get_cached_excerpt( """ if force: return None - try: - from python_pkg.word_frequency.cache import get_vocab_curve_cache - - return get_vocab_curve_cache().get(filepath, length) - except ImportError: - return None + return get_vocab_curve_cache().get(filepath, length) def cache_excerpt( @@ -447,31 +562,18 @@ def cache_excerpt( excerpt: The excerpt text. words: List of (word, rank) tuples. """ - try: - from python_pkg.word_frequency.cache import get_vocab_curve_cache - - get_vocab_curve_cache().set(filepath, length, excerpt, words) - except ImportError: - pass + get_vocab_curve_cache().set(filepath, length, excerpt, words) def get_cached_deck( - filepath: Path, - length: int, - target_lang: str, - include_context: bool, - all_vocab: bool, + key: AnkiDeckKey, *, force: bool = False, ) -> tuple[str, str, int, int] | None: """Get cached Anki deck if available. Args: - filepath: Path to source file. - length: Excerpt length. - target_lang: Target language. - include_context: Whether context is included. - all_vocab: Whether all vocab is included. + key: Cache key parameters. force: If True, ignore cache. Returns: @@ -479,22 +581,11 @@ def get_cached_deck( """ if force: return None - try: - from python_pkg.word_frequency.cache import get_anki_deck_cache - - return get_anki_deck_cache().get( - filepath, length, target_lang, include_context, all_vocab - ) - except ImportError: - return None + return get_anki_deck_cache().get(key) def cache_deck( - filepath: Path, - length: int, - target_lang: str, - include_context: bool, - all_vocab: bool, + key: AnkiDeckKey, anki_content: str, excerpt: str, num_words: int, @@ -503,139 +594,136 @@ def cache_deck( """Store Anki deck in cache. Args: - filepath: Path to source file. - length: Excerpt length. - target_lang: Target language. - include_context: Whether context is included. - all_vocab: Whether all vocab is included. + key: Cache key parameters. anki_content: The deck content. excerpt: The excerpt text. num_words: Number of words. max_rank: Maximum rank. """ - try: - from python_pkg.word_frequency.cache import get_anki_deck_cache + get_anki_deck_cache().set( + key, + anki_content, + excerpt, + num_words, + max_rank, + ) - get_anki_deck_cache().set( - filepath, - length, - target_lang, - include_context, - all_vocab, - anki_content, - excerpt, - num_words, - max_rank, + +def _detect_source_language( + filepath: Path, + text: str, +) -> str: + """Auto-detect source language from file content. + + Args: + filepath: Path to source file. + text: Already-read text (may be empty). + + Returns: + Detected language code. + + Raises: + ValueError: If language cannot be detected. + """ + sample_text = read_file(filepath)[:1000] if not text else text[:1000] + detected = detect_language(sample_text) + if detected is None: + msg = ( + "Could not auto-detect source language. " + "Please specify with --from (e.g., --from pl for Polish). " + "Install langdetect for auto-detection: " + "pip install langdetect" ) - except ImportError: - pass + raise ValueError(msg) + return detected def generate_flashcards( filepath: str | Path, excerpt_length: int, - source_lang: str | None = None, - target_lang: str = "en", - include_context: bool = False, - deck_name: str | None = None, - all_vocab: bool = True, - no_translate: bool = False, + options: FlashcardOptions | None = None, *, - force: bool = False, + all_vocab: bool = True, ) -> tuple[str, str, int, int]: - """Generate Anki flashcards for vocabulary needed for an excerpt length. + """Generate Anki flashcards for vocabulary needed for an excerpt. Args: filepath: Path to the source text file. excerpt_length: Target excerpt length. - source_lang: Source language (auto-detected if None). - target_lang: Target language for translations. - include_context: Whether to include example contexts. - deck_name: Optional deck name. - all_vocab: If True, include ALL words from rank 1 to max rank needed. - If False, only include words that appear in the excerpt. - no_translate: If True, skip translation. - force: If True, ignore all caches and regenerate. + options: Flashcard generation options. + all_vocab: If True, include ALL words rank 1 to max rank. Returns: Tuple of (anki_content, excerpt, num_words, max_rank). """ + if options is None: + options = FlashcardOptions() filepath = Path(filepath) + deck_key = AnkiDeckKey( + filepath=filepath, + length=excerpt_length, + target_lang=options.target_lang, + include_context=options.include_context, + all_vocab=all_vocab, + ) # Check for cached full deck (if not using no_translate) - if not no_translate and not force: - cached = get_cached_deck( - filepath, excerpt_length, target_lang, include_context, all_vocab - ) + if not options.no_translate and not options.force: + cached = get_cached_deck(deck_key) if cached is not None: return cached # Read the text (only needed for context finding) - text = read_file(filepath) if include_context else "" + text = read_file(filepath) if options.include_context else "" # Auto-detect language if not provided + source_lang = options.source_lang if source_lang is None: - sample_text = read_file(filepath)[:1000] if not text else text[:1000] - source_lang = detect_language(sample_text) - if source_lang is None: - raise ValueError( - "Could not auto-detect source language. " - "Please specify with --from (e.g., --from pl for Polish). " - "Install langdetect for auto-detection: pip install langdetect" - ) + source_lang = _detect_source_language(filepath, text) # Run vocabulary curve analysis with vocab dump for all words - output = run_vocabulary_curve(filepath, excerpt_length, dump_vocab=all_vocab) - # Parse the output (now includes all vocabulary from C) + output = run_vocabulary_curve( + filepath, excerpt_length, dump_vocab=all_vocab + ) excerpt, excerpt_words, all_vocab_words = parse_vocabulary_curve_output( output, excerpt_length ) if not excerpt_words: - raise ValueError(f"No words found for excerpt length {excerpt_length}") + msg = f"No words found for excerpt length {excerpt_length}" + raise ValueError(msg) - # Find max rank needed max_rank = max(rank for _, rank in excerpt_words) + words_with_ranks = ( + all_vocab_words if all_vocab and all_vocab_words else excerpt_words + ) - # Use vocabulary from C output - if all_vocab and all_vocab_words: - words_with_ranks = all_vocab_words - else: - words_with_ranks = excerpt_words - - # Get contexts if requested contexts = None - if include_context: + if options.include_context: if not text: text = read_file(filepath) words = [w for w, _ in words_with_ranks] contexts = find_word_contexts(text, words) - # Generate deck name - if deck_name is None: - deck_name = f"{filepath.stem}_vocab_{excerpt_length}" + deck_name = options.deck_name or f"{filepath.stem}_vocab_{excerpt_length}" - # Generate Anki content anki_content = generate_anki_deck( - words_with_ranks, - source_lang, - target_lang, - contexts, - deck_name, - include_context, - no_translate, - excerpt, - excerpt_words, + DeckInput( + words_with_ranks=words_with_ranks, + source_lang=source_lang, + target_lang=options.target_lang, + contexts=contexts, + deck_name=deck_name, + ), + include_context=options.include_context, + no_translate=options.no_translate, + excerpt=excerpt, + excerpt_words=excerpt_words, ) - # Cache the full deck (if translated) - if not no_translate: + if not options.no_translate: cache_deck( - filepath, - excerpt_length, - target_lang, - include_context, - all_vocab, + deck_key, anki_content, excerpt, len(words_with_ranks), @@ -648,13 +736,7 @@ def generate_flashcards( def generate_flashcards_inverse( filepath: str | Path, max_vocab: int, - source_lang: str | None = None, - target_lang: str = "en", - include_context: bool = False, - deck_name: str | None = None, - no_translate: bool = False, - *, - force: bool = False, + options: FlashcardOptions | None = None, ) -> tuple[str, str, int, int, int]: """Generate Anki flashcards for the longest excerpt using top N words. @@ -664,95 +746,262 @@ def generate_flashcards_inverse( Args: filepath: Path to the source text file. max_vocab: Maximum vocabulary size (top N words to learn). - source_lang: Source language (auto-detected if None). - target_lang: Target language for translations. - include_context: Whether to include example contexts. - deck_name: Optional deck name. - no_translate: If True, skip translation. - force: If True, ignore all caches and regenerate. + options: Flashcard generation options. Returns: - Tuple of (anki_content, excerpt, excerpt_length, num_words, max_rank_used). + Tuple of (anki_content, excerpt, excerpt_length, + num_words, max_rank_used). """ + if options is None: + options = FlashcardOptions() filepath = Path(filepath) - # Read the text (only needed for context finding) - text = read_file(filepath) if include_context else "" + text = read_file(filepath) if options.include_context else "" - # Auto-detect language if not provided + source_lang = options.source_lang if source_lang is None: - sample_text = read_file(filepath)[:1000] if not text else text[:1000] - source_lang = detect_language(sample_text) - if source_lang is None: - raise ValueError( - "Could not auto-detect source language. " - "Please specify with --from (e.g., --from pl for Polish). " - "Install langdetect for auto-detection: pip install langdetect" - ) + source_lang = _detect_source_language(filepath, text) - # Run vocabulary curve in inverse mode - output = run_vocabulary_curve_inverse(filepath, max_vocab, dump_vocab=True) - - # Parse the output - excerpt, excerpt_length, max_rank_used, all_vocab_words = parse_inverse_mode_output( - output + output = run_vocabulary_curve_inverse( + filepath, max_vocab, dump_vocab=True + ) + excerpt, excerpt_length, max_rank_used, all_vocab_words = ( + parse_inverse_mode_output(output) ) if excerpt_length == 0: - raise ValueError( - f"No valid excerpt found using only top {max_vocab} words. " - "Try increasing the vocabulary limit." + msg = ( + f"No valid excerpt found using only top {max_vocab} " + "words. Try increasing the vocabulary limit." ) + raise ValueError(msg) if not all_vocab_words: - raise ValueError(f"No vocabulary returned for max_vocab={max_vocab}") + msg = f"No vocabulary returned for max_vocab={max_vocab}" + raise ValueError(msg) - # Use all vocabulary up to max_vocab words_with_ranks = all_vocab_words - # Find words that appear in the excerpt (for highlighting) excerpt_word_set = set(excerpt.lower().split()) excerpt_words = [ - (w, r) for w, r in all_vocab_words if w.lower() in excerpt_word_set + (w, r) + for w, r in all_vocab_words + if w.lower() in excerpt_word_set ] - # Get contexts if requested contexts = None - if include_context: + if options.include_context: if not text: text = read_file(filepath) words = [w for w, _ in words_with_ranks] contexts = find_word_contexts(text, words) - # Generate deck name - if deck_name is None: - deck_name = f"{filepath.stem}_top{max_vocab}" + deck_name = options.deck_name or f"{filepath.stem}_top{max_vocab}" - # Generate Anki content anki_content = generate_anki_deck( - words_with_ranks, - source_lang, - target_lang, - contexts, - deck_name, - include_context, - no_translate, - excerpt, - excerpt_words if excerpt_words else None, + DeckInput( + words_with_ranks=words_with_ranks, + source_lang=source_lang, + target_lang=options.target_lang, + contexts=contexts, + deck_name=deck_name, + ), + include_context=options.include_context, + no_translate=options.no_translate, + excerpt=excerpt, + excerpt_words=excerpt_words or None, ) - return anki_content, excerpt, excerpt_length, len(words_with_ranks), max_rank_used + return ( + anki_content, + excerpt, + excerpt_length, + len(words_with_ranks), + max_rank_used, + ) -def main(argv: Sequence[str] | None = None) -> int: - """Main entry point. +def _format_cache_size(value: int) -> str: + """Format a byte size as human-readable string.""" + if value < _ONE_KB: + return f"{value} B" + if value < _ONE_MB: + return f"{value / _ONE_KB:.1f} KB" + return f"{value / _ONE_MB:.1f} MB" + + +def _print_cache_stats() -> int: + """Print cache statistics and return exit code.""" + stats = get_all_cache_stats() + logger.info("Cache Statistics") + logger.info("=" * 50) + for cache_name, cache_stats in stats.items(): + logger.info("\n%s:", cache_name.upper()) + for key, value in cache_stats.items(): + if key == "cache_size_bytes": + logger.info(" %s: %s", key, _format_cache_size(value)) + else: + logger.info(" %s: %s", key, value) + return 0 + + +def _clear_caches() -> int: + """Clear all caches and return exit code.""" + clear_all_caches() + logger.info("All caches cleared.") + return 0 + + +def _log_anki_import_instructions(output_path: Path) -> None: + """Log Anki import instructions.""" + logger.info("") + logger.info("To import into Anki:") + logger.info(" 1. Open Anki") + logger.info(" 2. File -> Import") + logger.info(" 3. Select: %s", output_path) + logger.info(" 4. Click Import") + + +def _handle_inverse_mode( + args: argparse.Namespace, + filepath: Path, +) -> int: + """Handle inverse mode (--max-vocab) flashcard generation. Args: - argv: Command line arguments. + args: Parsed command line arguments. + filepath: Path to source file. Returns: Exit code. """ + if not args.quiet: + logger.info("Analyzing %s...", filepath.name) + logger.info( + "Finding longest excerpt using top %d words...", + args.max_vocab, + ) + + anki_content, excerpt, excerpt_length, num_words, max_rank_used = ( + generate_flashcards_inverse( + filepath, + args.max_vocab, + FlashcardOptions( + source_lang=args.source_lang, + target_lang=args.target_lang, + deck_name=args.deck_name, + include_context=args.include_context, + no_translate=args.no_translate, + force=args.force, + ), + ) + ) + + output_path = ( + Path(args.output) + if args.output + else filepath.parent + / f"{filepath.stem}_anki_top{args.max_vocab}.txt" + ) + output_path.write_text(anki_content, encoding="utf-8") + + if not args.quiet: + logger.info("") + logger.info("=" * 60) + logger.info("FLASHCARD GENERATION COMPLETE (INVERSE MODE)") + logger.info("=" * 60) + logger.info("Learning: top %d words", args.max_vocab) + logger.info( + "Longest excerpt you can understand: %d words", + excerpt_length, + ) + logger.info(' "%s"', excerpt) + logger.info("") + logger.info("Rarest word in excerpt: #%d", max_rank_used) + logger.info("Flashcards: %d", num_words) + logger.info("Output file: %s", output_path) + _log_anki_import_instructions(output_path) + else: + logger.info("%s", output_path) + + return 0 + + +def _handle_normal_mode( + args: argparse.Namespace, + filepath: Path, +) -> int: + """Handle normal mode (--length) flashcard generation. + + Args: + args: Parsed command line arguments. + filepath: Path to source file. + + Returns: + Exit code. + """ + if not args.quiet: + logger.info("Analyzing %s...", filepath.name) + logger.info( + "Finding vocabulary for %d-word excerpt...", args.length + ) + + anki_content, excerpt, num_words, max_rank = generate_flashcards( + filepath, + args.length, + FlashcardOptions( + source_lang=args.source_lang, + target_lang=args.target_lang, + deck_name=args.deck_name, + include_context=args.include_context, + no_translate=args.no_translate, + force=args.force, + ), + all_vocab=not args.excerpt_words_only, + ) + + output_path = ( + Path(args.output) + if args.output + else filepath.parent / f"{filepath.stem}_anki_{args.length}.txt" + ) + output_path.write_text(anki_content, encoding="utf-8") + + if not args.quiet: + logger.info("") + logger.info("=" * 60) + logger.info("FLASHCARD GENERATION COMPLETE") + logger.info("=" * 60) + logger.info( + "Excerpt to understand (%d words):", args.length + ) + logger.info(' "%s"', excerpt) + logger.info("") + logger.info("Max word rank needed: #%d", max_rank) + if args.excerpt_words_only: + logger.info( + "Flashcards: %d (excerpt words only)", num_words + ) + else: + logger.info( + "Flashcards: %d (ALL words rank #1 to #%d)", + num_words, + max_rank, + ) + logger.info("Output file: %s", output_path) + _log_anki_import_instructions(output_path) + else: + logger.info("%s", output_path) + + return 0 + + +def _build_parser() -> argparse.ArgumentParser: + """Build the argument parser for the CLI. + + Returns: + Configured argument parser. + """ parser = argparse.ArgumentParser( description="Generate Anki flashcards from vocabulary analysis.", formatter_class=argparse.RawDescriptionHelpFormatter, @@ -771,21 +1020,30 @@ def main(argv: Sequence[str] | None = None) -> int: "-l", type=int, default=None, - help="Target excerpt length (how many words you want to understand)", + help=( + "Target excerpt length " + "(how many words you want to understand)" + ), ) parser.add_argument( "--max-vocab", "-v", type=int, default=None, - help="INVERSE MODE: Learn top N words, find longest excerpt you can understand", + help=( + "INVERSE MODE: Learn top N words, " + "find longest excerpt you can understand" + ), ) parser.add_argument( "--from", dest="source_lang", type=str, default=None, - help="Source language code (e.g., 'pl', 'la', 'de'). Auto-detected if not specified.", + help=( + "Source language code (e.g., 'pl', 'la', 'de'). " + "Auto-detected if not specified." + ), ) parser.add_argument( "--to", @@ -825,7 +1083,10 @@ def main(argv: Sequence[str] | None = None) -> int: "--excerpt-words-only", "-e", action="store_true", - help="Only include words that appear in the excerpt (default: include ALL words up to max rank)", + help=( + "Only include words that appear in the excerpt " + "(default: include ALL words up to max rank)" + ), ) parser.add_argument( "--no-translate", @@ -849,179 +1110,64 @@ def main(argv: Sequence[str] | None = None) -> int: action="store_true", help="Clear all caches and exit", ) + return parser + +def _run_generation(args: argparse.Namespace) -> int: + """Validate args and run flashcard generation. + + Args: + args: Parsed command line arguments. + + Returns: + Exit code. + """ + filepath = Path(args.file) + if not filepath.exists(): + logger.error("Error: File not found: %s", args.file) + return 1 + + if args.max_vocab is not None: + return _handle_inverse_mode(args, filepath) + return _handle_normal_mode(args, filepath) + + +def main(argv: Sequence[str] | None = None) -> int: + """Main entry point. + + Args: + argv: Command line arguments. + + Returns: + Exit code. + """ + parser = _build_parser() args = parser.parse_args(argv) - # Handle cache management commands if args.cache_stats: - try: - from python_pkg.word_frequency.cache import get_all_cache_stats - except ImportError: - try: - from cache import get_all_cache_stats - except ImportError: - print("Cache module not available", file=sys.stderr) - return 1 - stats = get_all_cache_stats() - print("Cache Statistics") - print("=" * 50) - for cache_name, cache_stats in stats.items(): - print(f"\n{cache_name.upper()}:") - for key, value in cache_stats.items(): - if key == "cache_size_bytes": - if value < 1024: - size_str = f"{value} B" - elif value < 1024 * 1024: - size_str = f"{value / 1024:.1f} KB" - else: - size_str = f"{value / (1024 * 1024):.1f} MB" - print(f" {key}: {size_str}") - else: - print(f" {key}: {value}") - return 0 + return _print_cache_stats() if args.clear_cache: - try: - from python_pkg.word_frequency.cache import clear_all_caches - except ImportError: - try: - from cache import clear_all_caches - except ImportError: - print("Cache module not available", file=sys.stderr) - return 1 - clear_all_caches() - print("All caches cleared.") - return 0 + return _clear_caches() - # Validate required arguments for main functionality if args.file is None: parser.error("--file/-f is required") if args.length is None and args.max_vocab is None: parser.error("Either --length/-l or --max-vocab/-v is required") if args.length is not None and args.max_vocab is not None: - parser.error("Cannot use both --length and --max-vocab. Choose one mode.") - - try: - filepath = Path(args.file) - if not filepath.exists(): - print(f"Error: File not found: {args.file}", file=sys.stderr) - return 1 - - # INVERSE MODE: --max-vocab - if args.max_vocab is not None: - if not args.quiet: - print(f"Analyzing {filepath.name}...") - print(f"Finding longest excerpt using top {args.max_vocab} words...") - - # Generate flashcards in inverse mode - anki_content, excerpt, excerpt_length, num_words, max_rank_used = ( - generate_flashcards_inverse( - filepath, - args.max_vocab, - source_lang=args.source_lang, - target_lang=args.target_lang, - include_context=args.include_context, - deck_name=args.deck_name, - no_translate=args.no_translate, - force=args.force, - ) - ) - - # Determine output path - if args.output: - output_path = Path(args.output) - else: - output_path = ( - filepath.parent / f"{filepath.stem}_anki_top{args.max_vocab}.txt" - ) - - # Write output - output_path.write_text(anki_content, encoding="utf-8") - - if not args.quiet: - print() - print("=" * 60) - print("FLASHCARD GENERATION COMPLETE (INVERSE MODE)") - print("=" * 60) - print(f"Learning: top {args.max_vocab} words") - print(f"Longest excerpt you can understand: {excerpt_length} words") - print(f' "{excerpt}"') - print() - print(f"Rarest word in excerpt: #{max_rank_used}") - print(f"Flashcards: {num_words}") - print(f"Output file: {output_path}") - print() - print("To import into Anki:") - print(" 1. Open Anki") - print(" 2. File -> Import") - print(f" 3. Select: {output_path}") - print(" 4. Click Import") - else: - print(output_path) - - return 0 - - # NORMAL MODE: --length - if not args.quiet: - print(f"Analyzing {filepath.name}...") - print(f"Finding vocabulary for {args.length}-word excerpt...") - - # Generate flashcards - anki_content, excerpt, num_words, max_rank = generate_flashcards( - filepath, - args.length, - source_lang=args.source_lang, - target_lang=args.target_lang, - include_context=args.include_context, - deck_name=args.deck_name, - all_vocab=not args.excerpt_words_only, - no_translate=args.no_translate, - force=args.force, + parser.error( + "Cannot use both --length and --max-vocab. Choose one mode." ) - # Determine output path - if args.output: - output_path = Path(args.output) - else: - output_path = filepath.parent / f"{filepath.stem}_anki_{args.length}.txt" - - # Write output - output_path.write_text(anki_content, encoding="utf-8") - - if not args.quiet: - print() - print("=" * 60) - print("FLASHCARD GENERATION COMPLETE") - print("=" * 60) - print(f"Excerpt to understand ({args.length} words):") - print(f' "{excerpt}"') - print() - print(f"Max word rank needed: #{max_rank}") - if args.excerpt_words_only: - print(f"Flashcards: {num_words} (excerpt words only)") - else: - print(f"Flashcards: {num_words} (ALL words rank #1 to #{max_rank})") - print(f"Output file: {output_path}") - print() - print("To import into Anki:") - print(" 1. Open Anki") - print(" 2. File -> Import") - print(f" 3. Select: {output_path}") - print(" 4. Click Import") - else: - print(output_path) - - return 0 - - except FileNotFoundError as e: - print(f"Error: {e}", file=sys.stderr) - return 1 - except subprocess.CalledProcessError as e: - print(f"Error running vocabulary_curve: {e}", file=sys.stderr) - return 1 - except ValueError as e: - print(f"Error: {e}", file=sys.stderr) - return 1 + try: + return _run_generation(args) + except FileNotFoundError: + logger.exception("File not found") + except subprocess.CalledProcessError: + logger.exception("Error running vocabulary_curve") + except ValueError: + logger.exception("Value error") + return 1 if __name__ == "__main__": diff --git a/python_pkg/word_frequency/cache.py b/python_pkg/word_frequency/cache.py index 75f4002..67e03fc 100755 --- a/python_pkg/word_frequency/cache.py +++ b/python_pkg/word_frequency/cache.py @@ -11,15 +11,23 @@ Cache location: ~/.cache/word_frequency/ from __future__ import annotations +import argparse +from dataclasses import dataclass import hashlib import json +import logging import os from pathlib import Path from typing import Any +logger = logging.getLogger(__name__) + # Default cache directory DEFAULT_CACHE_DIR = Path.home() / ".cache" / "word_frequency" +_ONE_KB = 1024 +_ONE_MB = 1024 * 1024 + def get_cache_dir() -> Path: """Get the cache directory, creating it if needed. @@ -42,7 +50,7 @@ def get_file_hash(filepath: Path) -> str: Hex digest of file hash. """ hasher = hashlib.sha256() - with open(filepath, "rb") as f: + with filepath.open("rb") as f: # Read in chunks for large files for chunk in iter(lambda: f.read(65536), b""): hasher.update(chunk) @@ -274,14 +282,15 @@ class VocabCurveCache: try: data = json.loads(cache_path.read_text(encoding="utf-8")) + except (json.JSONDecodeError, KeyError, OSError): + return None + else: # Verify hash matches if data.get("file_hash") != file_hash: return None excerpt = data["excerpt"] words = [(w, r) for w, r in data["words"]] return excerpt, words - except (json.JSONDecodeError, KeyError, OSError): - return None def set( self, @@ -339,6 +348,17 @@ class VocabCurveCache: # ============================================================================= +@dataclass(frozen=True) +class AnkiDeckKey: + """Key parameters for Anki deck cache lookups.""" + + filepath: Path + length: int + target_lang: str + include_context: bool + all_vocab: bool + + class AnkiDeckCache: """Cache for generated Anki decks.""" @@ -380,6 +400,7 @@ class AnkiDeckCache: file_hash: str, length: int, target_lang: str, + *, include_context: bool, all_vocab: bool, ) -> str: @@ -400,36 +421,35 @@ class AnkiDeckCache: def get( self, - filepath: Path, - length: int, - target_lang: str, - include_context: bool, - all_vocab: bool, + key: AnkiDeckKey, ) -> tuple[str, str, int, int] | None: """Get cached Anki deck. Args: - filepath: Path to source file. - length: Excerpt length. - target_lang: Target language. - include_context: Whether context is included. - all_vocab: Whether all vocab is included. + key: Cache key parameters. Returns: - Tuple of (anki_content, excerpt, num_words, max_rank) or None. + Tuple of (anki_content, excerpt, num_words, max_rank) + or None. """ - file_hash = get_file_hash(filepath) - key = self._make_key(file_hash, length, target_lang, include_context, all_vocab) + file_hash = get_file_hash(key.filepath) + cache_key = self._make_key( + file_hash, + key.length, + key.target_lang, + include_context=key.include_context, + all_vocab=key.all_vocab, + ) metadata = self._load_metadata() - if key not in metadata: + if cache_key not in metadata: return None - entry = metadata[key] + entry = metadata[cache_key] if entry.get("file_hash") != file_hash: return None - deck_file = self.cache_dir / f"{key}.txt" + deck_file = self.cache_dir / f"{cache_key}.txt" if not deck_file.exists(): return None @@ -446,11 +466,7 @@ class AnkiDeckCache: def set( self, - filepath: Path, - length: int, - target_lang: str, - include_context: bool, - all_vocab: bool, + key: AnkiDeckKey, anki_content: str, excerpt: str, num_words: int, @@ -459,32 +475,34 @@ class AnkiDeckCache: """Store Anki deck in cache. Args: - filepath: Path to source file. - length: Excerpt length. - target_lang: Target language. - include_context: Whether context is included. - all_vocab: Whether all vocab is included. + key: Cache key parameters. anki_content: The Anki deck content. excerpt: The excerpt text. num_words: Number of words in deck. max_rank: Maximum word rank. """ - file_hash = get_file_hash(filepath) - key = self._make_key(file_hash, length, target_lang, include_context, all_vocab) + file_hash = get_file_hash(key.filepath) + cache_key = self._make_key( + file_hash, + key.length, + key.target_lang, + include_context=key.include_context, + all_vocab=key.all_vocab, + ) # Save deck content - deck_file = self.cache_dir / f"{key}.txt" + deck_file = self.cache_dir / f"{cache_key}.txt" deck_file.write_text(anki_content, encoding="utf-8") # Update metadata metadata = self._load_metadata() - metadata[key] = { + metadata[cache_key] = { "file_hash": file_hash, - "filepath": str(filepath), - "length": length, - "target_lang": target_lang, - "include_context": include_context, - "all_vocab": all_vocab, + "filepath": str(key.filepath), + "length": key.length, + "target_lang": key.target_lang, + "include_context": key.include_context, + "all_vocab": key.all_vocab, "excerpt": excerpt, "num_words": num_words, "max_rank": max_rank, @@ -519,34 +537,33 @@ class AnkiDeckCache: # Global Cache Instances # ============================================================================= -# Singleton instances -_translation_cache: TranslationCache | None = None -_vocab_curve_cache: VocabCurveCache | None = None -_anki_deck_cache: AnkiDeckCache | None = None +class _CacheHolder: + """Holds singleton cache instances.""" + + translation: TranslationCache | None = None + vocab_curve: VocabCurveCache | None = None + anki_deck: AnkiDeckCache | None = None def get_translation_cache() -> TranslationCache: """Get the global translation cache instance.""" - global _translation_cache - if _translation_cache is None: - _translation_cache = TranslationCache() - return _translation_cache + if _CacheHolder.translation is None: + _CacheHolder.translation = TranslationCache() + return _CacheHolder.translation def get_vocab_curve_cache() -> VocabCurveCache: """Get the global vocabulary curve cache instance.""" - global _vocab_curve_cache - if _vocab_curve_cache is None: - _vocab_curve_cache = VocabCurveCache() - return _vocab_curve_cache + if _CacheHolder.vocab_curve is None: + _CacheHolder.vocab_curve = VocabCurveCache() + return _CacheHolder.vocab_curve def get_anki_deck_cache() -> AnkiDeckCache: """Get the global Anki deck cache instance.""" - global _anki_deck_cache - if _anki_deck_cache is None: - _anki_deck_cache = AnkiDeckCache() - return _anki_deck_cache + if _CacheHolder.anki_deck is None: + _CacheHolder.anki_deck = AnkiDeckCache() + return _CacheHolder.anki_deck def clear_all_caches() -> None: @@ -575,8 +592,6 @@ def main() -> int: Returns: Exit code. """ - import argparse - parser = argparse.ArgumentParser(description="Manage word frequency caches") parser.add_argument("--stats", action="store_true", help="Show cache statistics") parser.add_argument("--clear", action="store_true", help="Clear all caches") @@ -594,42 +609,42 @@ def main() -> int: if args.clear: clear_all_caches() - print("All caches cleared.") + logger.info("All caches cleared.") return 0 if args.clear_translations: get_translation_cache().clear() - print("Translation cache cleared.") + logger.info("Translation cache cleared.") return 0 if args.clear_excerpts: get_vocab_curve_cache().clear() - print("Excerpt cache cleared.") + logger.info("Excerpt cache cleared.") return 0 if args.clear_anki: get_anki_deck_cache().clear() - print("Anki deck cache cleared.") + logger.info("Anki deck cache cleared.") return 0 # Default: show stats stats = get_all_cache_stats() - print("Cache Statistics") - print("=" * 50) + logger.info("Cache Statistics") + logger.info("=" * 50) for cache_name, cache_stats in stats.items(): - print(f"\n{cache_name.upper()}:") + logger.info("\n%s:", cache_name.upper()) for key, value in cache_stats.items(): if key == "cache_size_bytes": # Format as human-readable - if value < 1024: + if value < _ONE_KB: size_str = f"{value} B" - elif value < 1024 * 1024: - size_str = f"{value / 1024:.1f} KB" + elif value < _ONE_MB: + size_str = f"{value / _ONE_KB:.1f} KB" else: - size_str = f"{value / (1024 * 1024):.1f} MB" - print(f" {key}: {size_str}") + size_str = f"{value / _ONE_MB:.1f} MB" + logger.info(" %s: %s", key, size_str) else: - print(f" {key}: {value}") + logger.info(" %s: %s", key, value) return 0 diff --git a/python_pkg/word_frequency/cache.py.bak b/python_pkg/word_frequency/cache.py.bak new file mode 100755 index 0000000..75f4002 --- /dev/null +++ b/python_pkg/word_frequency/cache.py.bak @@ -0,0 +1,640 @@ +#!/usr/bin/env python3 +"""Caching utilities for word frequency analysis. + +Provides disk-based caching for: +- Translations (word -> translation mappings) +- Vocabulary curve excerpts (file + length -> excerpt + words) +- Generated Anki decks + +Cache location: ~/.cache/word_frequency/ +""" + +from __future__ import annotations + +import hashlib +import json +import os +from pathlib import Path +from typing import Any + +# Default cache directory +DEFAULT_CACHE_DIR = Path.home() / ".cache" / "word_frequency" + + +def get_cache_dir() -> Path: + """Get the cache directory, creating it if needed. + + Returns: + Path to cache directory. + """ + cache_dir = Path(os.environ.get("WORD_FREQ_CACHE_DIR", str(DEFAULT_CACHE_DIR))) + cache_dir.mkdir(parents=True, exist_ok=True) + return cache_dir + + +def get_file_hash(filepath: Path) -> str: + """Compute SHA256 hash of a file's contents. + + Args: + filepath: Path to file. + + Returns: + Hex digest of file hash. + """ + hasher = hashlib.sha256() + with open(filepath, "rb") as f: + # Read in chunks for large files + for chunk in iter(lambda: f.read(65536), b""): + hasher.update(chunk) + return hasher.hexdigest() + + +def get_text_hash(text: str) -> str: + """Compute SHA256 hash of text content. + + Args: + text: Text to hash. + + Returns: + Hex digest of text hash. + """ + return hashlib.sha256(text.encode("utf-8")).hexdigest() + + +# ============================================================================= +# Translation Cache +# ============================================================================= + + +class TranslationCache: + """Cache for word translations.""" + + def __init__(self, cache_dir: Path | None = None) -> None: + """Initialize translation cache. + + Args: + cache_dir: Optional custom cache directory. + """ + self.cache_dir = cache_dir or get_cache_dir() + self.cache_file = self.cache_dir / "translations.json" + self._cache: dict[str, str] | None = None + self._dirty = False # Track if cache needs saving + + def _load_cache(self) -> dict[str, str]: + """Load cache from disk.""" + if self._cache is None: + if self.cache_file.exists(): + try: + self._cache = json.loads( + self.cache_file.read_text(encoding="utf-8") + ) + except (json.JSONDecodeError, OSError): + self._cache = {} + else: + self._cache = {} + return self._cache + + def _save_cache(self) -> None: + """Save cache to disk if dirty.""" + if self._cache is not None and self._dirty: + self.cache_file.write_text( + json.dumps(self._cache, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + self._dirty = False + + def flush(self) -> None: + """Force save cache to disk.""" + self._save_cache() + + @staticmethod + def _make_key(word: str, source_lang: str, target_lang: str) -> str: + """Create cache key for a translation. + + Args: + word: Word to translate. + source_lang: Source language code. + target_lang: Target language code. + + Returns: + Cache key string. + """ + return f"{source_lang}:{target_lang}:{word.lower()}" + + def get(self, word: str, source_lang: str, target_lang: str) -> str | None: + """Get cached translation. + + Args: + word: Word to look up. + source_lang: Source language code. + target_lang: Target language code. + + Returns: + Cached translation or None if not found. + """ + cache = self._load_cache() + key = self._make_key(word, source_lang, target_lang) + return cache.get(key) + + def set( + self, + word: str, + source_lang: str, + target_lang: str, + translation: str, + *, + auto_save: bool = False, + ) -> None: + """Store translation in cache. + + Args: + word: Original word. + source_lang: Source language code. + target_lang: Target language code. + translation: Translated word. + auto_save: If True, save to disk immediately. + """ + cache = self._load_cache() + key = self._make_key(word, source_lang, target_lang) + cache[key] = translation + self._dirty = True + if auto_save: + self._save_cache() + + def get_many( + self, words: list[str], source_lang: str, target_lang: str + ) -> dict[str, str]: + """Get multiple cached translations. + + Args: + words: Words to look up. + source_lang: Source language code. + target_lang: Target language code. + + Returns: + Dict mapping words to their cached translations. + """ + cache = self._load_cache() + result: dict[str, str] = {} + for word in words: + key = self._make_key(word, source_lang, target_lang) + if key in cache: + result[word.lower()] = cache[key] + return result + + def set_many( + self, + translations: dict[str, str], + source_lang: str, + target_lang: str, + ) -> None: + """Store multiple translations in cache and save to disk. + + Args: + translations: Dict mapping words to translations. + source_lang: Source language code. + target_lang: Target language code. + """ + cache = self._load_cache() + for word, translation in translations.items(): + key = self._make_key(word, source_lang, target_lang) + cache[key] = translation + self._dirty = True + self._save_cache() # Save once after all additions + + def clear(self) -> None: + """Clear all cached translations.""" + self._cache = {} + self._dirty = False + if self.cache_file.exists(): + self.cache_file.unlink() + + def stats(self) -> dict[str, Any]: + """Get cache statistics. + + Returns: + Dict with cache stats. + """ + cache = self._load_cache() + return { + "total_entries": len(cache), + "cache_file": str(self.cache_file), + "cache_size_bytes": ( + self.cache_file.stat().st_size if self.cache_file.exists() else 0 + ), + } + + +# ============================================================================= +# Vocabulary Curve Cache +# ============================================================================= + + +class VocabCurveCache: + """Cache for vocabulary curve analysis results.""" + + def __init__(self, cache_dir: Path | None = None) -> None: + """Initialize vocabulary curve cache. + + Args: + cache_dir: Optional custom cache directory. + """ + self.cache_dir = (cache_dir or get_cache_dir()) / "excerpts" + self.cache_dir.mkdir(parents=True, exist_ok=True) + + def _get_cache_path(self, file_hash: str, length: int) -> Path: + """Get path to cache file for given hash and length. + + Args: + file_hash: Hash of source file. + length: Excerpt length. + + Returns: + Path to cache file. + """ + return self.cache_dir / f"{file_hash[:16]}_{length}.json" + + def get( + self, filepath: Path, length: int + ) -> tuple[str, list[tuple[str, int]]] | None: + """Get cached excerpt and words for a file and length. + + Args: + filepath: Path to source file. + length: Excerpt length. + + Returns: + Tuple of (excerpt, words_with_ranks) or None if not cached. + """ + file_hash = get_file_hash(filepath) + cache_path = self._get_cache_path(file_hash, length) + + if not cache_path.exists(): + return None + + try: + data = json.loads(cache_path.read_text(encoding="utf-8")) + # Verify hash matches + if data.get("file_hash") != file_hash: + return None + excerpt = data["excerpt"] + words = [(w, r) for w, r in data["words"]] + return excerpt, words + except (json.JSONDecodeError, KeyError, OSError): + return None + + def set( + self, + filepath: Path, + length: int, + excerpt: str, + words: list[tuple[str, int]], + ) -> None: + """Store excerpt and words in cache. + + Args: + filepath: Path to source file. + length: Excerpt length. + excerpt: The excerpt text. + words: List of (word, rank) tuples. + """ + file_hash = get_file_hash(filepath) + cache_path = self._get_cache_path(file_hash, length) + + data = { + "file_hash": file_hash, + "filepath": str(filepath), + "length": length, + "excerpt": excerpt, + "words": [[w, r] for w, r in words], + } + + cache_path.write_text( + json.dumps(data, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + + def clear(self) -> None: + """Clear all cached excerpts.""" + for cache_file in self.cache_dir.glob("*.json"): + cache_file.unlink() + + def stats(self) -> dict[str, Any]: + """Get cache statistics. + + Returns: + Dict with cache stats. + """ + cache_files = list(self.cache_dir.glob("*.json")) + total_size = sum(f.stat().st_size for f in cache_files) + return { + "total_entries": len(cache_files), + "cache_dir": str(self.cache_dir), + "cache_size_bytes": total_size, + } + + +# ============================================================================= +# Anki Deck Cache +# ============================================================================= + + +class AnkiDeckCache: + """Cache for generated Anki decks.""" + + def __init__(self, cache_dir: Path | None = None) -> None: + """Initialize Anki deck cache. + + Args: + cache_dir: Optional custom cache directory. + """ + self.cache_dir = (cache_dir or get_cache_dir()) / "anki_decks" + self.cache_dir.mkdir(parents=True, exist_ok=True) + self.metadata_file = self.cache_dir / "metadata.json" + self._metadata: dict[str, Any] | None = None + + def _load_metadata(self) -> dict[str, Any]: + """Load metadata from disk.""" + if self._metadata is None: + if self.metadata_file.exists(): + try: + self._metadata = json.loads( + self.metadata_file.read_text(encoding="utf-8") + ) + except (json.JSONDecodeError, OSError): + self._metadata = {} + else: + self._metadata = {} + return self._metadata + + def _save_metadata(self) -> None: + """Save metadata to disk.""" + if self._metadata is not None: + self.metadata_file.write_text( + json.dumps(self._metadata, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + + @staticmethod + def _make_key( + file_hash: str, + length: int, + target_lang: str, + include_context: bool, + all_vocab: bool, + ) -> str: + """Create cache key for an Anki deck. + + Args: + file_hash: Hash of source file. + length: Excerpt length. + target_lang: Target language. + include_context: Whether context is included. + all_vocab: Whether all vocab is included. + + Returns: + Cache key string. + """ + flags = f"ctx{int(include_context)}_all{int(all_vocab)}" + return f"{file_hash[:16]}_{length}_{target_lang}_{flags}" + + def get( + self, + filepath: Path, + length: int, + target_lang: str, + include_context: bool, + all_vocab: bool, + ) -> tuple[str, str, int, int] | None: + """Get cached Anki deck. + + Args: + filepath: Path to source file. + length: Excerpt length. + target_lang: Target language. + include_context: Whether context is included. + all_vocab: Whether all vocab is included. + + Returns: + Tuple of (anki_content, excerpt, num_words, max_rank) or None. + """ + file_hash = get_file_hash(filepath) + key = self._make_key(file_hash, length, target_lang, include_context, all_vocab) + metadata = self._load_metadata() + + if key not in metadata: + return None + + entry = metadata[key] + if entry.get("file_hash") != file_hash: + return None + + deck_file = self.cache_dir / f"{key}.txt" + if not deck_file.exists(): + return None + + try: + content = deck_file.read_text(encoding="utf-8") + return ( + content, + entry["excerpt"], + entry["num_words"], + entry["max_rank"], + ) + except OSError: + return None + + def set( + self, + filepath: Path, + length: int, + target_lang: str, + include_context: bool, + all_vocab: bool, + anki_content: str, + excerpt: str, + num_words: int, + max_rank: int, + ) -> None: + """Store Anki deck in cache. + + Args: + filepath: Path to source file. + length: Excerpt length. + target_lang: Target language. + include_context: Whether context is included. + all_vocab: Whether all vocab is included. + anki_content: The Anki deck content. + excerpt: The excerpt text. + num_words: Number of words in deck. + max_rank: Maximum word rank. + """ + file_hash = get_file_hash(filepath) + key = self._make_key(file_hash, length, target_lang, include_context, all_vocab) + + # Save deck content + deck_file = self.cache_dir / f"{key}.txt" + deck_file.write_text(anki_content, encoding="utf-8") + + # Update metadata + metadata = self._load_metadata() + metadata[key] = { + "file_hash": file_hash, + "filepath": str(filepath), + "length": length, + "target_lang": target_lang, + "include_context": include_context, + "all_vocab": all_vocab, + "excerpt": excerpt, + "num_words": num_words, + "max_rank": max_rank, + } + self._save_metadata() + + def clear(self) -> None: + """Clear all cached decks.""" + self._metadata = {} + for cache_file in self.cache_dir.glob("*.txt"): + cache_file.unlink() + if self.metadata_file.exists(): + self.metadata_file.unlink() + + def stats(self) -> dict[str, Any]: + """Get cache statistics. + + Returns: + Dict with cache stats. + """ + metadata = self._load_metadata() + cache_files = list(self.cache_dir.glob("*.txt")) + total_size = sum(f.stat().st_size for f in cache_files) + return { + "total_entries": len(metadata), + "cache_dir": str(self.cache_dir), + "cache_size_bytes": total_size, + } + + +# ============================================================================= +# Global Cache Instances +# ============================================================================= + +# Singleton instances +_translation_cache: TranslationCache | None = None +_vocab_curve_cache: VocabCurveCache | None = None +_anki_deck_cache: AnkiDeckCache | None = None + + +def get_translation_cache() -> TranslationCache: + """Get the global translation cache instance.""" + global _translation_cache + if _translation_cache is None: + _translation_cache = TranslationCache() + return _translation_cache + + +def get_vocab_curve_cache() -> VocabCurveCache: + """Get the global vocabulary curve cache instance.""" + global _vocab_curve_cache + if _vocab_curve_cache is None: + _vocab_curve_cache = VocabCurveCache() + return _vocab_curve_cache + + +def get_anki_deck_cache() -> AnkiDeckCache: + """Get the global Anki deck cache instance.""" + global _anki_deck_cache + if _anki_deck_cache is None: + _anki_deck_cache = AnkiDeckCache() + return _anki_deck_cache + + +def clear_all_caches() -> None: + """Clear all caches.""" + get_translation_cache().clear() + get_vocab_curve_cache().clear() + get_anki_deck_cache().clear() + + +def get_all_cache_stats() -> dict[str, dict[str, Any]]: + """Get statistics for all caches. + + Returns: + Dict with stats for each cache type. + """ + return { + "translations": get_translation_cache().stats(), + "vocab_curves": get_vocab_curve_cache().stats(), + "anki_decks": get_anki_deck_cache().stats(), + } + + +def main() -> int: + """CLI for cache management. + + Returns: + Exit code. + """ + import argparse + + parser = argparse.ArgumentParser(description="Manage word frequency caches") + parser.add_argument("--stats", action="store_true", help="Show cache statistics") + parser.add_argument("--clear", action="store_true", help="Clear all caches") + parser.add_argument( + "--clear-translations", action="store_true", help="Clear translation cache" + ) + parser.add_argument( + "--clear-excerpts", action="store_true", help="Clear excerpt cache" + ) + parser.add_argument( + "--clear-anki", action="store_true", help="Clear Anki deck cache" + ) + + args = parser.parse_args() + + if args.clear: + clear_all_caches() + print("All caches cleared.") + return 0 + + if args.clear_translations: + get_translation_cache().clear() + print("Translation cache cleared.") + return 0 + + if args.clear_excerpts: + get_vocab_curve_cache().clear() + print("Excerpt cache cleared.") + return 0 + + if args.clear_anki: + get_anki_deck_cache().clear() + print("Anki deck cache cleared.") + return 0 + + # Default: show stats + stats = get_all_cache_stats() + print("Cache Statistics") + print("=" * 50) + for cache_name, cache_stats in stats.items(): + print(f"\n{cache_name.upper()}:") + for key, value in cache_stats.items(): + if key == "cache_size_bytes": + # Format as human-readable + if value < 1024: + size_str = f"{value} B" + elif value < 1024 * 1024: + size_str = f"{value / 1024:.1f} KB" + else: + size_str = f"{value / (1024 * 1024):.1f} MB" + print(f" {key}: {size_str}") + else: + print(f" {key}: {value}") + + return 0 + + +if __name__ == "__main__": + import sys + + sys.exit(main()) diff --git a/python_pkg/word_frequency/excerpt_finder.py b/python_pkg/word_frequency/excerpt_finder.py index 7f92e75..fcbd765 100755 --- a/python_pkg/word_frequency/excerpt_finder.py +++ b/python_pkg/word_frequency/excerpt_finder.py @@ -6,21 +6,28 @@ specified length (in words) where the target words appear most frequently. Usage: # From raw text with target words - python -m python_pkg.word_frequency.excerpt_finder --text "they went somewhere he and she and the guy" --words and the --length 3 + python -m python_pkg.word_frequency.excerpt_finder \ + --text "they went somewhere he and she and the guy" \ + --words and the --length 3 # From a file - python -m python_pkg.word_frequency.excerpt_finder --file path/to/file.txt --words the and of --length 10 + python -m python_pkg.word_frequency.excerpt_finder \ + --file path/to/file.txt --words the and of --length 10 # Target words from a file (one word per line) - python -m python_pkg.word_frequency.excerpt_finder --file text.txt --words-file targets.txt --length 20 + python -m python_pkg.word_frequency.excerpt_finder \ + --file text.txt --words-file targets.txt --length 20 # Show top N excerpts instead of just the best one - python -m python_pkg.word_frequency.excerpt_finder --file text.txt --words the and --length 10 --top 5 + python -m python_pkg.word_frequency.excerpt_finder \ + --file text.txt --words the and --length 10 --top 5 """ from __future__ import annotations import argparse +from dataclasses import dataclass +import logging from pathlib import Path import sys from typing import TYPE_CHECKING, NamedTuple @@ -33,6 +40,17 @@ except ModuleNotFoundError: if TYPE_CHECKING: from collections.abc import Sequence +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class ExcerptSearchOptions: + """Options for excerpt search and display.""" + + case_sensitive: bool = False + top_n: int = 1 + context_words: int = 0 + class ExcerptResult(NamedTuple): """Result of an excerpt search.""" @@ -141,45 +159,28 @@ def find_best_excerpt( return output -def find_best_excerpt_with_context( +def _expand_results_with_context( text: str, - target_words: Sequence[str], - excerpt_length: int, + base_results: list[ExcerptResult], + context_words: int, *, case_sensitive: bool = False, - top_n: int = 1, - context_words: int = 0, ) -> list[ExcerptResult]: - """Find the excerpt(s) with optional surrounding context. + """Expand excerpt results with surrounding context words. Args: - text: The input text to search. - target_words: Words to search for in the excerpt. - excerpt_length: Length of the excerpt in words. - case_sensitive: If False, match words case-insensitively. - top_n: Number of top excerpts to return. - context_words: Number of words to include before/after the excerpt. + text: The full source text. + base_results: Results from find_best_excerpt. + context_words: Number of words to include before/after. + case_sensitive: If False, words are lowercased. Returns: - List of ExcerptResult with context included in the excerpt. + Expanded ExcerptResult list with context. """ - base_results = find_best_excerpt( - text, - target_words, - excerpt_length, - case_sensitive=case_sensitive, - top_n=top_n, - ) - - if context_words <= 0: - return base_results - - # Re-extract all words to get context all_words = extract_words(text, case_sensitive=case_sensitive) expanded_results: list[ExcerptResult] = [] for result in base_results: - # Expand the excerpt with context ctx_start = max(0, result.start_index - context_words) ctx_end = min(len(all_words), result.end_index + context_words) context_excerpt_words = all_words[ctx_start:ctx_end] @@ -198,6 +199,40 @@ def find_best_excerpt_with_context( return expanded_results +def find_best_excerpt_with_context( + text: str, + target_words: Sequence[str], + excerpt_length: int, + options: ExcerptSearchOptions | None = None, +) -> list[ExcerptResult]: + """Find the excerpt(s) with optional surrounding context. + + Args: + text: The input text to search. + target_words: Words to search for in the excerpt. + excerpt_length: Length of the excerpt in words. + options: Search options (case_sensitive, top_n, context_words). + + Returns: + List of ExcerptResult with context included in the excerpt. + """ + opts = options or ExcerptSearchOptions() + base_results = find_best_excerpt( + text, + target_words, + excerpt_length, + case_sensitive=opts.case_sensitive, + top_n=opts.top_n, + ) + + if opts.context_words <= 0: + return base_results + + return _expand_results_with_context( + text, base_results, opts.context_words, case_sensitive=opts.case_sensitive + ) + + def format_excerpt_results( results: list[ExcerptResult], target_words: Sequence[str], @@ -224,7 +259,8 @@ def format_excerpt_results( lines.append(f'Excerpt: "{result.excerpt}"') lines.append(f"Word position: {result.start_index} - {result.end_index - 1}") lines.append( - f"Matches: {result.match_count}/{len(result.words)} ({result.match_percentage:.2f}%)" + f"Matches: {result.match_count}/{len(result.words)}" + f" ({result.match_percentage:.2f}%)" ) lines.append("") @@ -316,10 +352,7 @@ def main(argv: Sequence[str] | None = None) -> int: try: # Get input text - if args.text: - text = args.text - else: - text = read_file(args.file) + text = args.text or read_file(args.file) # Get target words if args.words: @@ -329,7 +362,7 @@ def main(argv: Sequence[str] | None = None) -> int: target_words = [w.strip() for w in words_content.splitlines() if w.strip()] if not target_words: - print("Error: No target words provided", file=sys.stderr) + logger.error("No target words provided") return 1 # Find excerpts @@ -337,9 +370,11 @@ def main(argv: Sequence[str] | None = None) -> int: text, target_words, args.length, - case_sensitive=args.case_sensitive, - top_n=args.top, - context_words=args.context, + ExcerptSearchOptions( + case_sensitive=args.case_sensitive, + top_n=args.top, + context_words=args.context, + ), ) # Format and print results @@ -347,15 +382,15 @@ def main(argv: Sequence[str] | None = None) -> int: if args.output: Path(args.output).write_text(output, encoding="utf-8") - print(f"Output written to {args.output}") + logger.info("Output written to %s", args.output) else: - print(output) + logger.info("%s", output) - except FileNotFoundError as e: - print(f"Error: File not found - {e}", file=sys.stderr) + except FileNotFoundError: + logger.exception("File not found") return 1 - except UnicodeDecodeError as e: - print(f"Error: Could not decode file as UTF-8 - {e}", file=sys.stderr) + except UnicodeDecodeError: + logger.exception("Could not decode file as UTF-8") return 1 return 0 diff --git a/python_pkg/word_frequency/learning_pipe.py b/python_pkg/word_frequency/learning_pipe.py index 0bbd253..2d788a2 100755 --- a/python_pkg/word_frequency/learning_pipe.py +++ b/python_pkg/word_frequency/learning_pipe.py @@ -1,7 +1,8 @@ #!/usr/bin/env python3 -"""Learning pipe - combines word frequency analysis with excerpt finding for language learning. +r"""Learning pipe - combines word frequency analysis with excerpt finding. + +Helps language learners by: -This script helps language learners by: 1. Analyzing a text to find the most common words 2. Finding excerpts where those common words are most prevalent 3. Creating a progressive learning experience in batches @@ -11,26 +12,35 @@ The idea is to: - Then read excerpts that are dense with those words - Progressively learn more words and more complex excerpts -Usage: - # Basic usage - get top 20 words and find excerpts with them - python -m python_pkg.word_frequency.learning_pipe --file text.txt +Usage:: + + # Basic usage + python -m python_pkg.word_frequency.learning_pipe \\ + --file text.txt # Custom batch size and excerpt length - python -m python_pkg.word_frequency.learning_pipe --file text.txt --batch-size 30 --excerpt-length 50 + python -m python_pkg.word_frequency.learning_pipe \\ + --file text.txt --batch-size 30 --excerpt-length 50 # Multiple batches for progressive learning - python -m python_pkg.word_frequency.learning_pipe --file text.txt --batches 5 --batch-size 20 + python -m python_pkg.word_frequency.learning_pipe \\ + --file text.txt --batches 5 --batch-size 20 # Output to file - python -m python_pkg.word_frequency.learning_pipe --file text.txt --output lesson.txt + python -m python_pkg.word_frequency.learning_pipe \\ + --file text.txt --output lesson.txt - # Skip common words (like "the", "a", "is") using a stopwords file - python -m python_pkg.word_frequency.learning_pipe --file text.txt --stopwords stopwords.txt + # Skip common words using a stopwords file + python -m python_pkg.word_frequency.learning_pipe \\ + --file text.txt --stopwords stopwords.txt """ from __future__ import annotations import argparse +from dataclasses import dataclass +from dataclasses import replace as _replace_dc +import logging from pathlib import Path import sys from typing import TYPE_CHECKING @@ -53,6 +63,8 @@ except ModuleNotFoundError: if TYPE_CHECKING: from collections.abc import Sequence +logger = logging.getLogger(__name__) + # Common stopwords for various languages (can be overridden with --stopwords) DEFAULT_STOPWORDS_EN = frozenset( @@ -181,57 +193,210 @@ def load_stopwords(filepath: str | Path | None) -> frozenset[str]: ) +@dataclass(frozen=True) +class LessonConfig: + """Configuration for learning lesson generation.""" + + batch_size: int = 20 + num_batches: int = 1 + excerpt_length: int = 30 + excerpts_per_batch: int = 3 + stopwords: frozenset[str] | None = None + skip_default_stopwords: bool = False + skip_numbers: bool = True + case_sensitive: bool = False + translate_from: str | None = None + translate_to: str | None = None + + +def _resolve_stopwords(config: LessonConfig) -> frozenset[str]: + """Resolve combined stopwords from config.""" + if config.skip_default_stopwords: + return config.stopwords or frozenset() + return DEFAULT_STOPWORDS_EN | (config.stopwords or frozenset()) + + +def _detect_translation_language( + text: str, + config: LessonConfig, + lines: list[str], +) -> tuple[str | None, str | None]: + """Detect translation settings and return (from, to) pair.""" + actual_from = config.translate_from + actual_to = config.translate_to or "en" + + if actual_from == "auto" or ( + config.translate_to and not config.translate_from + ): + detected = detect_language(text) + if detected: + actual_from = detected + lines.append(f"Detected language: {detected}") + else: + lines.append( + "Warning: Could not detect language " + "(install langdetect: " + "pip install langdetect)" + ) + actual_from = None + + return actual_from, actual_to + + +def _format_word_list( + batch_words: list[tuple[str, int]], + start_idx: int, + total_words: int, + translations: dict[str, str], +) -> list[str]: + """Format the vocabulary word list for a batch.""" + lines: list[str] = [] + for i, (word, count) in enumerate( + batch_words, start=start_idx + 1, + ): + percentage = (count / total_words) * 100 + if translations: + trans = translations.get(word, "?") + lines.append( + f" {i:3}. {word:<20} -> {trans:<20}" + f" ({count:,} occurrences, " + f"{percentage:.2f}%)" + ) + else: + lines.append( + f" {i:3}. {word:<20}" + f" ({count:,} occurrences, " + f"{percentage:.2f}%)" + ) + return lines + + +@dataclass(frozen=True) +class _LessonContext: + """Shared context for batch generation.""" + + text: str + word_counts: dict[str, int] + config: LessonConfig + + +def _generate_batch_section( + ctx: _LessonContext, + batch_num: int, + batch_words: list[tuple[str, int]], + cumulative_words: list[str], +) -> list[str]: + """Generate lines for a single batch section.""" + config = ctx.config + total_words = sum(ctx.word_counts.values()) + start_idx = batch_num * config.batch_size + end_idx = start_idx + config.batch_size + + lines: list[str] = [] + lines.append("-" * 70) + lines.append( + f"BATCH {batch_num + 1}: Words " + f"{start_idx + 1} - " + f"{min(end_idx, start_idx + len(batch_words))}" + ) + lines.append("-" * 70) + lines.append("") + + # Get translations if requested + translations: dict[str, str] = {} + do_translate = ( + config.translate_from is not None + and config.translate_to is not None + ) + if do_translate: + words_to_translate = [word for word, _ in batch_words] + translation_results = translate_words_batch( + words_to_translate, + config.translate_from, # type: ignore[arg-type] + config.translate_to, # type: ignore[arg-type] + ) + translations = { + r.source_word: r.translated_word + for r in translation_results + if r.success + } + + lines.append("VOCABULARY TO LEARN:") + lines.append("") + lines.extend( + _format_word_list( + batch_words, start_idx, total_words, translations, + ) + ) + lines.append("") + + # Cumulative coverage + cumulative_count = sum( + ctx.word_counts[w] + for w in cumulative_words + if w in ctx.word_counts + ) + coverage = (cumulative_count / total_words) * 100 + lines.append( + "After learning these words, " + f"you'll recognize ~{coverage:.1f}% of the text" + ) + lines.append("") + + # Excerpts + lines.append("PRACTICE EXCERPTS:") + lines.append( + "(Excerpts where your learned vocabulary " + "is most concentrated)" + ) + lines.append("") + + excerpts = find_best_excerpt( + ctx.text, + cumulative_words, + config.excerpt_length, + case_sensitive=config.case_sensitive, + top_n=config.excerpts_per_batch, + ) + + for j, excerpt in enumerate(excerpts, 1): + lines.append( + f" Excerpt {j} " + f"({excerpt.match_percentage:.1f}% known words):" + ) + lines.append(f' "{excerpt.excerpt}"') + lines.append("") + + return lines + + def generate_learning_lesson( text: str, - *, - batch_size: int = 20, - num_batches: int = 1, - excerpt_length: int = 30, - excerpts_per_batch: int = 3, - stopwords: frozenset[str] | None = None, - skip_default_stopwords: bool = False, - skip_numbers: bool = True, - case_sensitive: bool = False, - context_words: int = 5, - translate_from: str | None = None, - translate_to: str | None = None, + config: LessonConfig | None = None, ) -> str: """Generate a learning lesson from text. Args: text: The source text to analyze. - batch_size: Number of words per learning batch. - num_batches: Number of batches to generate. - excerpt_length: Length of each excerpt in words. - excerpts_per_batch: Number of excerpts to find per batch. - stopwords: Custom stopwords to skip (in addition to defaults). - skip_default_stopwords: If True, don't filter out default English stopwords. - skip_numbers: If True, filter out numeric words (default: True). - case_sensitive: If True, treat words case-sensitively. - context_words: Words of context to include around excerpts. - translate_from: Source language code for translation (e.g., 'la', 'pl'). - translate_to: Target language code for translation (e.g., 'en'). + config: Lesson configuration. Uses defaults if None. Returns: Formatted learning lesson as a string. """ - # Combine stopwords - all_stopwords: frozenset[str] - if skip_default_stopwords: - all_stopwords = stopwords or frozenset() - else: - all_stopwords = DEFAULT_STOPWORDS_EN | (stopwords or frozenset()) + if config is None: + config = LessonConfig() - # Analyze text for word frequencies - word_counts = analyze_text(text, case_sensitive=case_sensitive) + all_stopwords = _resolve_stopwords(config) + word_counts = analyze_text( + text, case_sensitive=config.case_sensitive, + ) - # Filter out stopwords and get sorted words filtered_words = [ (word, count) for word, count in word_counts.most_common() if word.lower() not in all_stopwords and len(word) > 1 - and not (skip_numbers and word.isdigit()) + and not (config.skip_numbers and word.isdigit()) ] total_words = sum(word_counts.values()) @@ -241,125 +406,62 @@ def generate_learning_lesson( lines.append("LANGUAGE LEARNING LESSON") lines.append("=" * 70) lines.append( - f"Source text: {total_words:,} total words, {len(word_counts):,} unique words" + f"Source text: {total_words:,} total words, " + f"{len(word_counts):,} unique words" ) if all_stopwords: lines.append( - f"After filtering {len(all_stopwords)} stopwords: {len(filtered_words):,} vocabulary words" + f"After filtering {len(all_stopwords)} " + f"stopwords: {len(filtered_words):,} " + "vocabulary words" ) else: - lines.append(f"Vocabulary words: {len(filtered_words):,}") + lines.append( + f"Vocabulary words: {len(filtered_words):,}", + ) - # Handle translation setup - actual_translate_from = translate_from - actual_translate_to = translate_to or "en" # Default to English - - # Auto-detect language if translation is enabled but source not specified - if translate_from == "auto" or (translate_to and not translate_from): - detected = detect_language(text) - if detected: - actual_translate_from = detected - lines.append(f"Detected language: {detected}") - # Note: langdetect doesn't support Latin (often detected as Italian) - # If detection seems wrong, use --translate-from to override - else: - lines.append( - "Warning: Could not detect language " - "(install langdetect: pip install langdetect)" - ) - actual_translate_from = None - - do_translate = actual_translate_from is not None and actual_translate_to is not None + actual_from, actual_to = _detect_translation_language( + text, config, lines, + ) + do_translate = ( + actual_from is not None and actual_to is not None + ) if do_translate: - lines.append(f"Translation: {actual_translate_from} -> {actual_translate_to}") - + lines.append( + f"Translation: {actual_from} -> {actual_to}", + ) lines.append("") - # Generate batches + # Create resolved config with detected translation + resolved_config = _replace_dc( + config, + translate_from=actual_from, + translate_to=actual_to, + ) + ctx = _LessonContext( + text=text, + word_counts=word_counts, + config=resolved_config, + ) + cumulative_words: list[str] = [] - - for batch_num in range(num_batches): - start_idx = batch_num * batch_size - end_idx = start_idx + batch_size - + for batch_num in range(config.num_batches): + start_idx = batch_num * config.batch_size + end_idx = start_idx + config.batch_size if start_idx >= len(filtered_words): break batch_words = filtered_words[start_idx:end_idx] cumulative_words.extend(word for word, _ in batch_words) - lines.append("-" * 70) - lines.append( - f"BATCH {batch_num + 1}: Words {start_idx + 1} - {min(end_idx, len(filtered_words))}" - ) - lines.append("-" * 70) - lines.append("") - - # Get translations if requested - translations: dict[str, str] = {} - if do_translate: - words_to_translate = [word for word, _ in batch_words] - translation_results = translate_words_batch( - words_to_translate, - actual_translate_from, # type: ignore[arg-type] - actual_translate_to, # type: ignore[arg-type] + lines.extend( + _generate_batch_section( + ctx, + batch_num, + batch_words, + cumulative_words, ) - translations = { - r.source_word: r.translated_word - for r in translation_results - if r.success - } - - # Word list with frequencies - lines.append("VOCABULARY TO LEARN:") - lines.append("") - - if do_translate and translations: - # Include translations in output - for i, (word, count) in enumerate(batch_words, start=start_idx + 1): - percentage = (count / total_words) * 100 - trans = translations.get(word, "?") - lines.append( - f" {i:3}. {word:<20} -> {trans:<20} ({count:,} occurrences, {percentage:.2f}%)" - ) - else: - for i, (word, count) in enumerate(batch_words, start=start_idx + 1): - percentage = (count / total_words) * 100 - lines.append( - f" {i:3}. {word:<20} ({count:,} occurrences, {percentage:.2f}%)" - ) - - lines.append("") - - # Calculate cumulative coverage - cumulative_count = sum( - word_counts[word] for word in cumulative_words if word in word_counts ) - coverage = (cumulative_count / total_words) * 100 - lines.append( - f"After learning these words, you'll recognize ~{coverage:.1f}% of the text" - ) - lines.append("") - - # Find excerpts using cumulative words - lines.append("PRACTICE EXCERPTS:") - lines.append("(Excerpts where your learned vocabulary is most concentrated)") - lines.append("") - - excerpts = find_best_excerpt( - text, - cumulative_words, - excerpt_length, - case_sensitive=case_sensitive, - top_n=excerpts_per_batch, - ) - - for j, excerpt in enumerate(excerpts, 1): - lines.append( - f" Excerpt {j} ({excerpt.match_percentage:.1f}% known words):" - ) - lines.append(f' "{excerpt.excerpt}"') - lines.append("") # Summary lines.append("=" * 70) @@ -368,14 +470,25 @@ def generate_learning_lesson( if cumulative_words: final_coverage = sum( - word_counts[word] for word in cumulative_words if word in word_counts + word_counts[w] + for w in cumulative_words + if w in word_counts ) - final_percentage = (final_coverage / total_words) * 100 - lines.append(f"Total vocabulary words learned: {len(cumulative_words)}") - lines.append(f"Text coverage: {final_percentage:.1f}%") + final_pct = (final_coverage / total_words) * 100 + lines.append( + "Total vocabulary words learned: " + f"{len(cumulative_words)}" + ) + lines.append(f"Text coverage: {final_pct:.1f}%") lines.append("") - lines.append("TIP: Focus on understanding the excerpts first, then read") - lines.append("more of the original text as your vocabulary grows!") + lines.append( + "TIP: Focus on understanding the excerpts " + "first, then read" + ) + lines.append( + "more of the original text as your " + "vocabulary grows!" + ) return "\n".join(lines) @@ -475,7 +588,10 @@ def main(argv: Sequence[str] | None = None) -> int: "--translate-from", type=str, metavar="LANG", - help="Source language code (e.g., 'la', 'pl', 'de'). If omitted, auto-detected.", + help=( + "Source language code (e.g., 'la', 'pl'). " + "If omitted, auto-detected." + ), ) parser.add_argument( "--translate-to", @@ -496,27 +612,22 @@ def main(argv: Sequence[str] | None = None) -> int: args = parser.parse_args(argv) try: - # Get input text - if args.text: - text = args.text - else: - text = read_file(args.file) + text = args.text or read_file(args.file) # Load custom stopwords if provided custom_stopwords = load_stopwords(args.stopwords) # Determine translation settings - # Translation enabled by default, --no-translate disables it translate_from: str | None = None translate_to: str | None = None if not args.no_translate: - translate_from = args.translate_from or "auto" # "auto" triggers detection + translate_from = ( + args.translate_from or "auto" + ) translate_to = args.translate_to - # Generate lesson - lesson = generate_learning_lesson( - text, + config = LessonConfig( batch_size=args.batch_size, num_batches=args.batches, excerpt_length=args.excerpt_length, @@ -528,19 +639,26 @@ def main(argv: Sequence[str] | None = None) -> int: translate_from=translate_from, translate_to=translate_to, ) + lesson = generate_learning_lesson(text, config) # Output if args.output: - Path(args.output).write_text(lesson, encoding="utf-8") - print(f"Lesson written to {args.output}") + Path(args.output).write_text( + lesson, encoding="utf-8", + ) + logger.info( + "Lesson written to %s", args.output, + ) else: - print(lesson) + logger.info(lesson) - except FileNotFoundError as e: - print(f"Error: File not found - {e}", file=sys.stderr) + except FileNotFoundError: + logger.exception("Error: File not found") return 1 - except UnicodeDecodeError as e: - print(f"Error: Could not decode file as UTF-8 - {e}", file=sys.stderr) + except UnicodeDecodeError: + logger.exception( + "Error: Could not decode file as UTF-8", + ) return 1 return 0 diff --git a/python_pkg/word_frequency/tests/test_analyzer.py b/python_pkg/word_frequency/tests/test_analyzer.py index 7ed1137..4b01593 100644 --- a/python_pkg/word_frequency/tests/test_analyzer.py +++ b/python_pkg/word_frequency/tests/test_analyzer.py @@ -3,8 +3,11 @@ from __future__ import annotations from collections import Counter -from pathlib import Path import time +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from pathlib import Path import pytest @@ -251,12 +254,13 @@ class TestMain: assert exit_code == 0 assert "Unique words: 3" in captured.out - def test_file_not_found_error(self, capsys: pytest.CaptureFixture[str]) -> None: + def test_file_not_found_error( + self, caplog: pytest.LogCaptureFixture + ) -> None: """Test error handling for missing file.""" exit_code = main(["--file", "/nonexistent/file.txt"]) - captured = capsys.readouterr() assert exit_code == 1 - assert "Error" in captured.err + assert "File not found" in caplog.text class TestPerformance: @@ -283,7 +287,7 @@ class TestPerformance: assert elapsed < 10.0, f"Analysis took {elapsed:.2f}s, expected < 10s" assert "word0" in result # Most common word should be present - def test_bible_sized_text_performance(self, tmp_path: Path) -> None: + def test_bible_sized_text_performance(self) -> None: """Test with Bible-sized text (~800k words).""" # Generate text similar in size to the Bible base_words = ["the", "and", "of", "to", "in", "a", "that", "is", "was", "for"] diff --git a/python_pkg/word_frequency/tests/test_anki_generator.py b/python_pkg/word_frequency/tests/test_anki_generator.py index b7c0c69..ff421a9 100755 --- a/python_pkg/word_frequency/tests/test_anki_generator.py +++ b/python_pkg/word_frequency/tests/test_anki_generator.py @@ -10,6 +10,7 @@ import pytest try: from python_pkg.word_frequency.anki_generator import ( + DeckInput, find_word_contexts, generate_anki_deck, main, @@ -20,6 +21,7 @@ except ImportError: sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent)) from python_pkg.word_frequency.anki_generator import ( + DeckInput, find_word_contexts, generate_anki_deck, main, @@ -77,7 +79,7 @@ class TestParseVocabularyCurveOutput: def test_parse_length_1(self, sample_vocabulary_output: str) -> None: """Test parsing output for length 1.""" - excerpt, excerpt_words, all_vocab = parse_vocabulary_curve_output( + excerpt, excerpt_words, _all_vocab = parse_vocabulary_curve_output( sample_vocabulary_output, 1 ) assert excerpt == "the" @@ -85,7 +87,7 @@ class TestParseVocabularyCurveOutput: def test_parse_length_2(self, sample_vocabulary_output: str) -> None: """Test parsing output for length 2.""" - excerpt, excerpt_words, all_vocab = parse_vocabulary_curve_output( + excerpt, excerpt_words, _all_vocab = parse_vocabulary_curve_output( sample_vocabulary_output, 2 ) assert excerpt == "the dog" @@ -93,7 +95,7 @@ class TestParseVocabularyCurveOutput: def test_parse_length_3(self, sample_vocabulary_output: str) -> None: """Test parsing output for length 3.""" - excerpt, excerpt_words, all_vocab = parse_vocabulary_curve_output( + excerpt, excerpt_words, _all_vocab = parse_vocabulary_curve_output( sample_vocabulary_output, 3 ) assert excerpt == "the quick fox" @@ -104,7 +106,7 @@ class TestParseVocabularyCurveOutput: def test_parse_nonexistent_length(self, sample_vocabulary_output: str) -> None: """Test parsing output for non-existent length.""" - excerpt, excerpt_words, all_vocab = parse_vocabulary_curve_output( + excerpt, excerpt_words, _all_vocab = parse_vocabulary_curve_output( sample_vocabulary_output, 100 ) assert excerpt == "" @@ -121,7 +123,7 @@ hello;1 world;2 VOCAB_DUMP_END """ - excerpt, excerpt_words, all_vocab = parse_vocabulary_curve_output(output, 2) + _excerpt, _excerpt_words, all_vocab = parse_vocabulary_curve_output(output, 2) assert all_vocab == [("hello", 1), ("world", 2)] @@ -168,10 +170,12 @@ class TestGenerateAnkiDeck: MagicMock(success=True, source_word="hello", translated_word="hola") ] result = generate_anki_deck( - [("hello", 1)], - source_lang="en", - target_lang="es", - deck_name="TestDeck", + DeckInput( + words_with_ranks=[("hello", 1)], + source_lang="en", + target_lang="es", + deck_name="TestDeck", + ), ) assert "#separator:semicolon" in result @@ -188,9 +192,11 @@ class TestGenerateAnkiDeck: MagicMock(success=True, source_word="world", translated_word="mundo"), ] result = generate_anki_deck( - [("hello", 1), ("world", 2)], - source_lang="en", - target_lang="es", + DeckInput( + words_with_ranks=[("hello", 1), ("world", 2)], + source_lang="en", + target_lang="es", + ), ) # Check that words and translations are present @@ -208,9 +214,11 @@ class TestGenerateAnkiDeck: MagicMock(success=True, source_word="test", translated_word="prueba") ] result = generate_anki_deck( - [("test", 42)], - source_lang="en", - target_lang="es", + DeckInput( + words_with_ranks=[("test", 42)], + source_lang="en", + target_lang="es", + ), ) assert "#42" in result @@ -226,9 +234,11 @@ class TestGenerateAnkiDeck: ) ] result = generate_anki_deck( - [("test;word", 1)], - source_lang="en", - target_lang="es", + DeckInput( + words_with_ranks=[("test;word", 1)], + source_lang="en", + target_lang="es", + ), ) # Semicolons should be replaced with commas @@ -244,10 +254,12 @@ class TestGenerateAnkiDeck: ] contexts = {"hello": "...say hello to..."} result = generate_anki_deck( - [("hello", 1)], - source_lang="en", - target_lang="es", - contexts=contexts, + DeckInput( + words_with_ranks=[("hello", 1)], + source_lang="en", + target_lang="es", + contexts=contexts, + ), include_context=True, ) @@ -257,9 +269,11 @@ class TestGenerateAnkiDeck: def test_no_translate_flag(self) -> None: """Test that no_translate skips translation.""" result = generate_anki_deck( - [("hello", 1), ("world", 2)], - source_lang="en", - target_lang="es", + DeckInput( + words_with_ranks=[("hello", 1), ("world", 2)], + source_lang="en", + target_lang="es", + ), no_translate=True, ) @@ -280,7 +294,7 @@ class TestMain: result = main(["--file", "nonexistent.txt", "--length", "10"]) assert result == 1 - def test_help_flag(self, capsys: pytest.CaptureFixture[str]) -> None: + def test_help_flag(self) -> None: """Test that --help works.""" with pytest.raises(SystemExit) as exc_info: main(["--help"]) @@ -309,7 +323,7 @@ class TestIntegration: ) as mock_translate: # Mock translation to avoid network calls def mock_translate_fn( - words: list[str], from_lang: str, to_lang: str + words: list[str], _from_lang: str, _to_lang: str ) -> list[MagicMock]: return [ MagicMock(success=True, source_word=w, translated_word=f"[{w}]") @@ -324,6 +338,8 @@ class TestIntegration: str(sample_text_file), "--length", "5", + "--from", + "en", "--output", str(output_file), "--quiet", @@ -337,9 +353,11 @@ class TestIntegration: assert "#separator:semicolon" in content def test_cli_with_sample_file( - self, sample_text_file: Path, tmp_path: Path, capsys: pytest.CaptureFixture[str] + self, sample_text_file: Path, tmp_path: Path, caplog: pytest.LogCaptureFixture ) -> None: """Test CLI with actual file.""" + import logging + from python_pkg.word_frequency.anki_generator import C_EXECUTABLE if not C_EXECUTABLE.exists(): @@ -347,9 +365,12 @@ class TestIntegration: output_file = tmp_path / "anki_output.txt" - with patch( - "python_pkg.word_frequency.anki_generator.translate_words_batch" - ) as mock_translate: + with ( + caplog.at_level(logging.INFO), + patch( + "python_pkg.word_frequency.anki_generator.translate_words_batch" + ) as mock_translate, + ): mock_translate.return_value = [ MagicMock(success=True, source_word="the", translated_word="le") ] @@ -360,14 +381,15 @@ class TestIntegration: str(sample_text_file), "--length", "1", + "--from", + "en", "--output", str(output_file), ] ) assert result == 0 - captured = capsys.readouterr() - assert "FLASHCARD GENERATION COMPLETE" in captured.out + assert "FLASHCARD GENERATION COMPLETE" in caplog.text if __name__ == "__main__": diff --git a/python_pkg/word_frequency/tests/test_excerpt_finder.py b/python_pkg/word_frequency/tests/test_excerpt_finder.py index 4ec179d..2cdaea3 100644 --- a/python_pkg/word_frequency/tests/test_excerpt_finder.py +++ b/python_pkg/word_frequency/tests/test_excerpt_finder.py @@ -2,13 +2,18 @@ from __future__ import annotations -from pathlib import Path +import logging import time +from typing import TYPE_CHECKING import pytest +if TYPE_CHECKING: + from pathlib import Path + from python_pkg.word_frequency.excerpt_finder import ( ExcerptResult, + ExcerptSearchOptions, find_best_excerpt, find_best_excerpt_with_context, format_excerpt_results, @@ -146,7 +151,8 @@ class TestFindBestExcerptWithContext: """Test with zero context (should behave like find_best_excerpt).""" text = "a b c d e f g" result = find_best_excerpt_with_context( - text, ["c"], excerpt_length=1, context_words=0 + text, ["c"], excerpt_length=1, + options=ExcerptSearchOptions(context_words=0), ) assert result[0].excerpt == "c" @@ -155,7 +161,8 @@ class TestFindBestExcerptWithContext: """Test with context words.""" text = "a b c d e f g" result = find_best_excerpt_with_context( - text, ["d"], excerpt_length=1, context_words=2 + text, ["d"], excerpt_length=1, + options=ExcerptSearchOptions(context_words=2), ) # "d" at index 3, with context should include 2 words before and after @@ -167,7 +174,8 @@ class TestFindBestExcerptWithContext: """Test context doesn't go before start of text.""" text = "a b c d e" result = find_best_excerpt_with_context( - text, ["a"], excerpt_length=1, context_words=3 + text, ["a"], excerpt_length=1, + options=ExcerptSearchOptions(context_words=3), ) # Can't go before "a", so just get words after @@ -178,7 +186,8 @@ class TestFindBestExcerptWithContext: """Test context doesn't go beyond end of text.""" text = "a b c d e" result = find_best_excerpt_with_context( - text, ["e"], excerpt_length=1, context_words=3 + text, ["e"], excerpt_length=1, + options=ExcerptSearchOptions(context_words=3), ) # Can't go beyond "e" @@ -240,33 +249,33 @@ class TestFormatExcerptResults: class TestMain: """Tests for main CLI function.""" - def test_text_and_words_input(self, capsys: pytest.CaptureFixture[str]) -> None: + def test_text_and_words_input(self, caplog: pytest.LogCaptureFixture) -> None: """Test --text and --words options.""" - exit_code = main( - ["--text", "hello world hello", "--words", "hello", "--length", "2"] - ) - captured = capsys.readouterr() + with caplog.at_level(logging.INFO): + exit_code = main( + ["--text", "hello world hello", "--words", "hello", "--length", "2"] + ) assert exit_code == 0 - assert "hello" in captured.out + assert "hello" in caplog.text def test_file_input( - self, tmp_path: Path, capsys: pytest.CaptureFixture[str] + self, tmp_path: Path, caplog: pytest.LogCaptureFixture ) -> None: """Test --file input option.""" test_file = tmp_path / "test.txt" test_file.write_text("hello world hello world", encoding="utf-8") - exit_code = main( - ["--file", str(test_file), "--words", "hello", "--length", "2"] - ) - captured = capsys.readouterr() + with caplog.at_level(logging.INFO): + exit_code = main( + ["--file", str(test_file), "--words", "hello", "--length", "2"] + ) assert exit_code == 0 - assert "hello" in captured.out + assert "hello" in caplog.text def test_words_file_input( - self, tmp_path: Path, capsys: pytest.CaptureFixture[str] + self, tmp_path: Path, caplog: pytest.LogCaptureFixture ) -> None: """Test --words-file option.""" text_file = tmp_path / "text.txt" @@ -274,91 +283,91 @@ class TestMain: text_file.write_text("hello world hello world", encoding="utf-8") words_file.write_text("hello\nworld\n", encoding="utf-8") - exit_code = main( - [ - "--file", - str(text_file), - "--words-file", - str(words_file), - "--length", - "2", - ] - ) - captured = capsys.readouterr() + with caplog.at_level(logging.INFO): + exit_code = main( + [ + "--file", + str(text_file), + "--words-file", + str(words_file), + "--length", + "2", + ] + ) assert exit_code == 0 - assert "100.00%" in captured.out # Both words match + assert "100.00%" in caplog.text # Both words match - def test_top_option(self, capsys: pytest.CaptureFixture[str]) -> None: + def test_top_option(self, caplog: pytest.LogCaptureFixture) -> None: """Test --top option.""" - exit_code = main( - [ - "--text", - "a b c d e f", - "--words", - "a", - "b", - "--length", - "2", - "--top", - "3", - ] - ) - captured = capsys.readouterr() + with caplog.at_level(logging.INFO): + exit_code = main( + [ + "--text", + "a b c d e f", + "--words", + "a", + "b", + "--length", + "2", + "--top", + "3", + ] + ) assert exit_code == 0 # Should show multiple results - assert "Result #1" in captured.out + assert "Result #1" in caplog.text - def test_context_option(self, capsys: pytest.CaptureFixture[str]) -> None: + def test_context_option(self, caplog: pytest.LogCaptureFixture) -> None: """Test --context option.""" - exit_code = main( - [ - "--text", - "a b c d e f g", - "--words", - "d", - "--length", - "1", - "--context", - "2", - ] - ) - capsys.readouterr() + with caplog.at_level(logging.INFO): + exit_code = main( + [ + "--text", + "a b c d e f g", + "--words", + "d", + "--length", + "1", + "--context", + "2", + ] + ) assert exit_code == 0 # Excerpt should include context words - def test_case_sensitive_option(self, capsys: pytest.CaptureFixture[str]) -> None: + def test_case_sensitive_option(self, caplog: pytest.LogCaptureFixture) -> None: """Test --case-sensitive option.""" - exit_code = main( - [ - "--text", - "Hello HELLO hello", - "--words", - "hello", - "--length", - "1", - "--case-sensitive", - ] - ) - capsys.readouterr() + with caplog.at_level(logging.INFO): + exit_code = main( + [ + "--text", + "Hello HELLO hello", + "--words", + "hello", + "--length", + "1", + "--case-sensitive", + ] + ) assert exit_code == 0 # Only lowercase "hello" should match - def test_file_not_found(self, capsys: pytest.CaptureFixture[str]) -> None: + def test_file_not_found(self, caplog: pytest.LogCaptureFixture) -> None: """Test error handling for missing file.""" - exit_code = main( - ["--file", "/nonexistent/file.txt", "--words", "hello", "--length", "2"] - ) - captured = capsys.readouterr() + with caplog.at_level(logging.ERROR): + exit_code = main( + ["--file", "/nonexistent/file.txt", "--words", "hello", "--length", "2"] + ) assert exit_code == 1 - assert "Error" in captured.err + assert "Error" in caplog.text def test_empty_words_file( - self, tmp_path: Path, capsys: pytest.CaptureFixture[str] + self, tmp_path: Path, caplog: pytest.LogCaptureFixture ) -> None: """Test error when words file is empty.""" text_file = tmp_path / "text.txt" @@ -366,20 +375,20 @@ class TestMain: text_file.write_text("hello world", encoding="utf-8") words_file.write_text("", encoding="utf-8") - exit_code = main( - [ - "--file", - str(text_file), - "--words-file", - str(words_file), - "--length", - "2", - ] - ) - captured = capsys.readouterr() + with caplog.at_level(logging.ERROR): + exit_code = main( + [ + "--file", + str(text_file), + "--words-file", + str(words_file), + "--length", + "2", + ] + ) assert exit_code == 1 - assert "No target words" in captured.err + assert "No target words" in caplog.text class TestPerformance: diff --git a/python_pkg/word_frequency/tests/test_learning_pipe.py b/python_pkg/word_frequency/tests/test_learning_pipe.py index bfbb7a5..1444c32 100644 --- a/python_pkg/word_frequency/tests/test_learning_pipe.py +++ b/python_pkg/word_frequency/tests/test_learning_pipe.py @@ -2,16 +2,20 @@ from __future__ import annotations -from pathlib import Path +import logging import time from typing import TYPE_CHECKING from unittest.mock import MagicMock, patch import pytest +if TYPE_CHECKING: + from pathlib import Path + import python_pkg.word_frequency.learning_pipe as learning_pipe_module from python_pkg.word_frequency.learning_pipe import ( DEFAULT_STOPWORDS_EN, + LessonConfig, generate_learning_lesson, load_stopwords, main, @@ -23,7 +27,7 @@ if TYPE_CHECKING: @pytest.fixture -def mock_translation() -> Generator[MagicMock, None, None]: +def _mock_translation() -> Generator[MagicMock, None, None]: """Mock translation to avoid requiring argostranslate.""" def fake_batch_translate( @@ -31,7 +35,7 @@ def mock_translation() -> Generator[MagicMock, None, None]: from_lang: str, to_lang: str, *, - use_cache: bool = True, + _use_cache: bool = True, ) -> list[TranslationResult]: """Fake batch translation that returns word with prefix.""" return [ @@ -95,7 +99,7 @@ class TestGenerateLearningLesson: """Test basic lesson generation.""" text = "hello world hello hello world test test test test" result = generate_learning_lesson( - text, batch_size=3, num_batches=1, skip_default_stopwords=True + text, LessonConfig(batch_size=3, num_batches=1, skip_default_stopwords=True) ) assert "LANGUAGE LEARNING LESSON" in result @@ -106,7 +110,7 @@ class TestGenerateLearningLesson: """Test generation with multiple batches.""" text = " ".join(f"word{i}" * (100 - i) for i in range(20)) result = generate_learning_lesson( - text, batch_size=5, num_batches=3, skip_default_stopwords=True + text, LessonConfig(batch_size=5, num_batches=3, skip_default_stopwords=True) ) assert "BATCH 1" in result @@ -116,7 +120,9 @@ class TestGenerateLearningLesson: def test_stopwords_filtering(self) -> None: """Test that default stopwords are filtered.""" text = "the the the hello world" - result = generate_learning_lesson(text, batch_size=5, num_batches=1) + result = generate_learning_lesson( + text, LessonConfig(batch_size=5, num_batches=1) + ) # "the" should be filtered, "hello" and "world" should appear lines = result.split("\n") @@ -139,7 +145,7 @@ class TestGenerateLearningLesson: """Test disabling default stopword filtering.""" text = "the the the hello" result = generate_learning_lesson( - text, batch_size=5, num_batches=1, skip_default_stopwords=True + text, LessonConfig(batch_size=5, num_batches=1, skip_default_stopwords=True) ) assert "the" in result.lower() @@ -148,7 +154,7 @@ class TestGenerateLearningLesson: """Test that numbers are filtered by default.""" text = "123 123 123 hello world" result = generate_learning_lesson( - text, batch_size=5, num_batches=1, skip_default_stopwords=True + text, LessonConfig(batch_size=5, num_batches=1, skip_default_stopwords=True) ) # Check vocabulary section doesn't include "123" @@ -162,10 +168,12 @@ class TestGenerateLearningLesson: text = "123 123 123 hello" result = generate_learning_lesson( text, - batch_size=5, - num_batches=1, - skip_default_stopwords=True, - skip_numbers=False, + LessonConfig( + batch_size=5, + num_batches=1, + skip_default_stopwords=True, + skip_numbers=False, + ), ) assert "123" in result @@ -174,7 +182,7 @@ class TestGenerateLearningLesson: """Test that coverage percentage is calculated.""" text = "hello hello hello world world test" result = generate_learning_lesson( - text, batch_size=3, num_batches=1, skip_default_stopwords=True + text, LessonConfig(batch_size=3, num_batches=1, skip_default_stopwords=True) ) assert "recognize" in result.lower() @@ -185,11 +193,13 @@ class TestGenerateLearningLesson: text = "hello world hello world hello world test test test" result = generate_learning_lesson( text, - batch_size=2, - num_batches=1, - excerpt_length=3, - excerpts_per_batch=2, - skip_default_stopwords=True, + LessonConfig( + batch_size=2, + num_batches=1, + excerpt_length=3, + excerpts_per_batch=2, + skip_default_stopwords=True, + ), ) assert "PRACTICE EXCERPTS" in result @@ -200,45 +210,45 @@ class TestMain: """Tests for main CLI function.""" def test_basic_text_input( - self, capsys: pytest.CaptureFixture[str], mock_translation: None + self, caplog: pytest.LogCaptureFixture, _mock_translation: None ) -> None: """Test with text input.""" - exit_code = main( - [ - "--text", - "hello world hello world test test test", - "--batch-size", - "3", - "--no-default-stopwords", - ] - ) - captured = capsys.readouterr() + with caplog.at_level(logging.INFO): + exit_code = main( + [ + "--text", + "hello world hello world test test test", + "--batch-size", + "3", + "--no-default-stopwords", + ] + ) assert exit_code == 0 - assert "LANGUAGE LEARNING LESSON" in captured.out + assert "LANGUAGE LEARNING LESSON" in caplog.text def test_file_input( - self, tmp_path: Path, capsys: pytest.CaptureFixture[str], mock_translation: None + self, tmp_path: Path, caplog: pytest.LogCaptureFixture, _mock_translation: None ) -> None: """Test with file input.""" test_file = tmp_path / "test.txt" test_file.write_text("hello world hello world test", encoding="utf-8") - exit_code = main( - [ - "--file", - str(test_file), - "--batch-size", - "3", - "--no-default-stopwords", - ] - ) - captured = capsys.readouterr() + with caplog.at_level(logging.INFO): + exit_code = main( + [ + "--file", + str(test_file), + "--batch-size", + "3", + "--no-default-stopwords", + ] + ) assert exit_code == 0 - assert "hello" in captured.out.lower() + assert "hello" in caplog.text.lower() - def test_output_to_file(self, tmp_path: Path, mock_translation: None) -> None: + def test_output_to_file(self, tmp_path: Path, _mock_translation: None) -> None: """Test outputting to file.""" output_file = tmp_path / "lesson.txt" @@ -258,7 +268,7 @@ class TestMain: assert "LANGUAGE LEARNING LESSON" in content def test_custom_stopwords( - self, tmp_path: Path, capsys: pytest.CaptureFixture[str], mock_translation: None + self, tmp_path: Path, _mock_translation: None ) -> None: """Test with custom stopwords file.""" stopwords_file = tmp_path / "stop.txt" @@ -275,41 +285,40 @@ class TestMain: "5", ] ) - capsys.readouterr() assert exit_code == 0 # "hello" should be filtered by custom stopwords def test_multiple_batches_option( - self, capsys: pytest.CaptureFixture[str], mock_translation: None + self, caplog: pytest.LogCaptureFixture, _mock_translation: None ) -> None: """Test --batches option.""" text = " ".join(f"word{i}" * (50 - i) for i in range(30)) - exit_code = main( - [ - "--text", - text, - "--batch-size", - "5", - "--batches", - "3", - "--no-default-stopwords", - ] - ) - captured = capsys.readouterr() + with caplog.at_level(logging.INFO): + exit_code = main( + [ + "--text", + text, + "--batch-size", + "5", + "--batches", + "3", + "--no-default-stopwords", + ] + ) assert exit_code == 0 - assert "BATCH 1" in captured.out - assert "BATCH 2" in captured.out - assert "BATCH 3" in captured.out + assert "BATCH 1" in caplog.text + assert "BATCH 2" in caplog.text + assert "BATCH 3" in caplog.text - def test_file_not_found(self, capsys: pytest.CaptureFixture[str]) -> None: + def test_file_not_found(self, caplog: pytest.LogCaptureFixture) -> None: """Test error handling for missing file.""" - exit_code = main(["--file", "/nonexistent/file.txt"]) - captured = capsys.readouterr() + with caplog.at_level(logging.ERROR): + exit_code = main(["--file", "/nonexistent/file.txt"]) assert exit_code == 1 - assert "Error" in captured.err + assert "Error" in caplog.text class TestPerformance: @@ -324,10 +333,12 @@ class TestPerformance: start_time = time.perf_counter() result = generate_learning_lesson( large_text, - batch_size=50, - num_batches=5, - excerpt_length=30, - skip_default_stopwords=True, + LessonConfig( + batch_size=50, + num_batches=5, + excerpt_length=30, + skip_default_stopwords=True, + ), ) elapsed = time.perf_counter() - start_time @@ -358,9 +369,11 @@ class TestTranslationIntegration: text = "hello world hello world hello" result = generate_learning_lesson( text, - batch_size=5, - num_batches=1, - skip_default_stopwords=True, + LessonConfig( + batch_size=5, + num_batches=1, + skip_default_stopwords=True, + ), ) assert "hello" in result @@ -368,17 +381,19 @@ class TestTranslationIntegration: # Should not have translation arrows assert " -> " not in result or "Translation" not in result - def test_lesson_with_translation_params(self, mock_translation: None) -> None: + def test_lesson_with_translation_params(self, _mock_translation: None) -> None: """Test that translation params are accepted.""" text = "hello world hello world hello" # This should work with mocked translation result = generate_learning_lesson( text, - batch_size=5, - num_batches=1, - skip_default_stopwords=True, - translate_from="en", - translate_to="es", + LessonConfig( + batch_size=5, + num_batches=1, + skip_default_stopwords=True, + translate_from="en", + translate_to="es", + ), ) # The lesson should still be generated @@ -386,7 +401,7 @@ class TestTranslationIntegration: assert "hello" in result def test_main_with_translate_flags( - self, tmp_path: Path, mock_translation: None + self, tmp_path: Path, _mock_translation: None ) -> None: """Test that main accepts translation flags.""" text_file = tmp_path / "test.txt" @@ -408,36 +423,42 @@ class TestTranslationIntegration: assert result == 0 def test_translate_to_defaults_to_english( - self, capsys: pytest.CaptureFixture[str], mock_translation: None + self, _mock_translation: None ) -> None: """Test that translate_to defaults to 'en' when using auto-detection.""" text = "hello world" - # When using --translate flag (translate_from="auto"), translate_to defaults to "en" - result = generate_learning_lesson( - text, - batch_size=5, - num_batches=1, - skip_default_stopwords=True, - translate_from="auto", # Auto-detect source language - translate_to=None, # Should default to English - ) + # When using --translate flag (translate_from="auto"), + # translate_to defaults to "en" + with patch.object( + learning_pipe_module, "detect_language", return_value="es" + ): + result = generate_learning_lesson( + text, + LessonConfig( + batch_size=5, + num_batches=1, + skip_default_stopwords=True, + translate_from="auto", # Auto-detect source language + translate_to=None, # Should default to English + ), + ) # Should have translation output with auto-detected source -> en assert "Detected language:" in result assert " -> en" in result - def test_no_translation_when_both_none( - self, capsys: pytest.CaptureFixture[str] - ) -> None: - """Test no translation happens when both translate_from and translate_to are None.""" + def test_no_translation_when_both_none(self) -> None: + """Test no translation when both translate params are None.""" text = "hello world" result = generate_learning_lesson( text, - batch_size=5, - num_batches=1, - skip_default_stopwords=True, - translate_from=None, - translate_to=None, + LessonConfig( + batch_size=5, + num_batches=1, + skip_default_stopwords=True, + translate_from=None, + translate_to=None, + ), ) # Should not have translation output diff --git a/python_pkg/word_frequency/tests/test_translator.py b/python_pkg/word_frequency/tests/test_translator.py index 620aa4a..d3678f2 100644 --- a/python_pkg/word_frequency/tests/test_translator.py +++ b/python_pkg/word_frequency/tests/test_translator.py @@ -61,19 +61,16 @@ class ArgosAvailableMock: self.mock_translate_module = MagicMock() self.mock_package_module = MagicMock() self.mock_parent = MagicMock() - self.original_available = translator._argos_available self._sys_modules_patcher: MagicMock | None = None self._ensure_patcher: MagicMock | None = None self._lang_patcher: MagicMock | None = None + self._check_argos_patcher: MagicMock | None = None + self._argos_module_patcher: MagicMock | None = None def __enter__(self) -> MagicMock: """Set up the mocks.""" - translator._argos_available = True - # Set up translate return value - if isinstance(self.translate_returns, Exception) or isinstance( - self.translate_returns, list - ): + if isinstance(self.translate_returns, (Exception, list)): self.mock_translate_fn.side_effect = self.translate_returns elif self.translate_returns is not None: self.mock_translate_fn.return_value = self.translate_returns @@ -96,41 +93,52 @@ class ArgosAvailableMock: }, ) + # Patch the module-level argostranslate reference in translator + self._argos_module_patcher = patch.object( + translator, "argostranslate", self.mock_parent, create=True + ) + # Patch _ensure_argos_installed and _ensure_language_pair to no-op self._ensure_patcher = patch.object( translator, "_ensure_argos_installed", lambda: None ) self._lang_patcher = patch.object( - translator, "_ensure_language_pair", lambda f, t: None + translator, "_ensure_language_pair", lambda _f, _t: None + ) + self._check_argos_patcher = patch.object( + translator, "_check_argos", return_value=True ) self._sys_modules_patcher.start() # type: ignore[union-attr] + self._argos_module_patcher.start() # type: ignore[union-attr] self._ensure_patcher.start() # type: ignore[union-attr] self._lang_patcher.start() # type: ignore[union-attr] + self._check_argos_patcher.start() # type: ignore[union-attr] return self.mock_translate_fn def __exit__(self, *args: object) -> None: """Restore original state.""" + if self._check_argos_patcher: + self._check_argos_patcher.stop() if self._lang_patcher: self._lang_patcher.stop() if self._ensure_patcher: self._ensure_patcher.stop() + if self._argos_module_patcher: + self._argos_module_patcher.stop() if self._sys_modules_patcher: self._sys_modules_patcher.stop() - translator._argos_available = self.original_available # Fixtures @pytest.fixture -def mock_argos_unavailable() -> Generator[None, None, None]: +def _mock_argos_unavailable() -> Generator[None, None, None]: """Mock argostranslate being unavailable (for legacy tests).""" - original_value = translator._argos_available - translator._argos_available = False - yield - translator._argos_available = original_value + with patch.object(translator, "_check_argos", return_value=False): + yield @pytest.fixture @@ -178,7 +186,7 @@ class TestTranslationResult: def test_result_is_tuple(self) -> None: """Test that TranslationResult is a namedtuple.""" - result = TranslationResult("a", "b", "en", "es", True) + result = TranslationResult("a", "b", "en", "es", success=True) assert isinstance(result, tuple) assert len(result) == 6 @@ -192,13 +200,15 @@ class TestTranslateWord: def test_translate_word_argos_unavailable_raises(self) -> None: """Test that translation raises ImportError when argos is unavailable.""" # Mock _ensure_argos_installed to raise ImportError - with patch.object( - translator, - "_ensure_argos_installed", - side_effect=ImportError("argostranslate not available"), + with ( + patch.object( + translator, + "_ensure_argos_installed", + side_effect=ImportError("argostranslate not available"), + ), + pytest.raises(ImportError, match="argostranslate not available"), ): - with pytest.raises(ImportError, match="argostranslate not available"): - translate_word("hello", "en", "es", use_cache=False) + translate_word("hello", "en", "es", use_cache=False) def test_translate_word_success(self) -> None: """Test successful word translation.""" @@ -243,13 +253,15 @@ class TestTranslateWords: def test_translate_words_argos_unavailable_raises(self) -> None: """Test that translating words raises ImportError when argos unavailable.""" - with patch.object( - translator, - "_ensure_argos_installed", - side_effect=ImportError("argostranslate not available"), + with ( + patch.object( + translator, + "_ensure_argos_installed", + side_effect=ImportError("argostranslate not available"), + ), + pytest.raises(ImportError, match="argostranslate not available"), ): - with pytest.raises(ImportError, match="argostranslate not available"): - translate_words(["hello", "world"], "en", "es", use_cache=False) + translate_words(["hello", "world"], "en", "es", use_cache=False) # translate_words_batch tests @@ -290,7 +302,7 @@ class TestTranslateWordsBatch: assert results[4].translated_word == "cinco" def test_batch_fallback_on_mismatch(self) -> None: - """Test batch translation falls back to individual when result count mismatches.""" + """Test batch falls back to individual on result count mismatch.""" words = ["one", "two", "three", "four"] # First call (batch) returns wrong count, subsequent calls are individual with ArgosAvailableMock(["wrong", "uno", "dos", "tres", "cuatro"]) as mock: @@ -313,10 +325,11 @@ class TestTranslateWordsBatch: mock_parent.translate = mock_translate_module mock_parent.package = mock_package_module - original = translator._argos_available - translator._argos_available = True - with ( + patch.object(translator, "_check_argos", return_value=True), + patch.object( + translator, "argostranslate", mock_parent, create=True + ), patch.dict( "sys.modules", { @@ -326,22 +339,22 @@ class TestTranslateWordsBatch: }, ), patch.object(translator, "_ensure_argos_installed", lambda: None), - patch.object(translator, "_ensure_language_pair", lambda f, t: None), + patch.object(translator, "_ensure_language_pair", lambda _f, _t: None), pytest.raises(RuntimeError, match="Translation failed"), ): translate_words_batch(words, "en", "es", use_cache=False) - translator._argos_available = original - def test_batch_argos_unavailable_raises(self) -> None: """Test that batch translation raises ImportError when argos unavailable.""" - with patch.object( - translator, - "_ensure_argos_installed", - side_effect=ImportError("argostranslate not available"), + with ( + patch.object( + translator, + "_ensure_argos_installed", + side_effect=ImportError("argostranslate not available"), + ), + pytest.raises(ImportError, match="argostranslate not available"), ): - with pytest.raises(ImportError, match="argostranslate not available"): - translate_words_batch(["hello", "world"], "en", "es", use_cache=False) + translate_words_batch(["hello", "world"], "en", "es", use_cache=False) # format_translations tests @@ -358,7 +371,7 @@ class TestFormatTranslations: def test_format_single_translation(self) -> None: """Test formatting single translation.""" results = [ - TranslationResult("hello", "hola", "en", "es", True), + TranslationResult("hello", "hola", "en", "es", success=True), ] output = format_translations(results) @@ -369,8 +382,8 @@ class TestFormatTranslations: def test_format_multiple_translations(self) -> None: """Test formatting multiple translations.""" results = [ - TranslationResult("hello", "hola", "en", "es", True), - TranslationResult("world", "mundo", "en", "es", True), + TranslationResult("hello", "hola", "en", "es", success=True), + TranslationResult("world", "mundo", "en", "es", success=True), ] output = format_translations(results) @@ -382,8 +395,10 @@ class TestFormatTranslations: def test_format_with_errors(self) -> None: """Test formatting with failed translations.""" results = [ - TranslationResult("hello", "hola", "en", "es", True), - TranslationResult("xyz", "", "en", "es", False, "Unknown word"), + TranslationResult("hello", "hola", "en", "es", success=True), + TranslationResult( + "xyz", "", "en", "es", success=False, error="Unknown word" + ), ] output = format_translations(results, show_errors=True) @@ -393,8 +408,10 @@ class TestFormatTranslations: def test_format_hide_errors(self) -> None: """Test formatting with errors hidden.""" results = [ - TranslationResult("hello", "hola", "en", "es", True), - TranslationResult("xyz", "", "en", "es", False, "Unknown word"), + TranslationResult("hello", "hola", "en", "es", success=True), + TranslationResult( + "xyz", "", "en", "es", success=False, error="Unknown word" + ), ] output = format_translations(results, show_errors=False) @@ -408,7 +425,7 @@ class TestFormatTranslations: class TestGetInstalledLanguages: """Tests for get_installed_languages function.""" - def test_argos_unavailable(self, mock_argos_unavailable: None) -> None: + def test_argos_unavailable(self, _mock_argos_unavailable: None) -> None: """Test when argos is unavailable.""" result = get_installed_languages() assert result == [] @@ -433,21 +450,22 @@ class TestGetInstalledLanguages: mock_parent.translate = mock_translate_module mock_parent.package = mock_package_module - original = translator._argos_available - translator._argos_available = True - - with patch.dict( - "sys.modules", - { - "argostranslate": mock_parent, - "argostranslate.translate": mock_translate_module, - "argostranslate.package": mock_package_module, - }, + with ( + patch.object(translator, "_check_argos", return_value=True), + patch.object( + translator, "argostranslate", mock_parent, create=True + ), + patch.dict( + "sys.modules", + { + "argostranslate": mock_parent, + "argostranslate.translate": mock_translate_module, + "argostranslate.package": mock_package_module, + }, + ), ): result = get_installed_languages() - translator._argos_available = original - assert ("en", "English") in result assert ("es", "Spanish") in result @@ -458,7 +476,7 @@ class TestGetInstalledLanguages: class TestGetAvailablePackages: """Tests for get_available_packages function.""" - def test_argos_unavailable(self, mock_argos_unavailable: None) -> None: + def test_argos_unavailable(self, _mock_argos_unavailable: None) -> None: """Test when argos is unavailable.""" result = get_available_packages() assert result == [] @@ -470,7 +488,7 @@ class TestGetAvailablePackages: class TestDownloadLanguages: """Tests for download_languages function.""" - def test_argos_unavailable(self, mock_argos_unavailable: None) -> None: + def test_argos_unavailable(self, _mock_argos_unavailable: None) -> None: """Test when argos is unavailable.""" result = download_languages(["en", "es"]) assert result == {} @@ -503,7 +521,7 @@ class TestReadFile: class TestMain: """Tests for main CLI function.""" - def test_argos_unavailable_error(self, mock_argos_unavailable: None) -> None: + def test_argos_unavailable_error(self, _mock_argos_unavailable: None) -> None: """Test error when argos not installed.""" result = main(["--text", "hello", "--from", "en", "--to", "es"]) assert result == 1 @@ -517,21 +535,22 @@ class TestMain: mock_parent.translate = mock_translate_module mock_parent.package = mock_package_module - original = translator._argos_available - translator._argos_available = True - - with patch.dict( - "sys.modules", - { - "argostranslate": mock_parent, - "argostranslate.translate": mock_translate_module, - "argostranslate.package": mock_package_module, - }, + with ( + patch.object(translator, "_check_argos", return_value=True), + patch.object( + translator, "argostranslate", mock_parent, create=True + ), + patch.dict( + "sys.modules", + { + "argostranslate": mock_parent, + "argostranslate.translate": mock_translate_module, + "argostranslate.package": mock_package_module, + }, + ), ): result = main(["--list-languages"]) - translator._argos_available = original - assert result == 0 captured = capsys.readouterr() assert "No languages installed" in captured.out @@ -551,21 +570,22 @@ class TestMain: mock_parent.translate = mock_translate_module mock_parent.package = mock_package_module - original = translator._argos_available - translator._argos_available = True - - with patch.dict( - "sys.modules", - { - "argostranslate": mock_parent, - "argostranslate.translate": mock_translate_module, - "argostranslate.package": mock_package_module, - }, + with ( + patch.object(translator, "_check_argos", return_value=True), + patch.object( + translator, "argostranslate", mock_parent, create=True + ), + patch.dict( + "sys.modules", + { + "argostranslate": mock_parent, + "argostranslate.translate": mock_translate_module, + "argostranslate.package": mock_package_module, + }, + ), ): result = main(["--list-languages"]) - translator._argos_available = original - assert result == 0 captured = capsys.readouterr() assert "en" in captured.out @@ -622,7 +642,6 @@ class TestMain: def test_translate_output_to_file( self, tmp_path: Path, - capsys: pytest.CaptureFixture[str], ) -> None: """Test outputting translations to file.""" output_file = tmp_path / "output.txt" @@ -647,7 +666,9 @@ class TestMain: assert "hello" in content assert "hola" in content - def test_no_input_shows_help(self, capsys: pytest.CaptureFixture[str]) -> None: + def test_no_input_shows_help( + self, + ) -> None: """Test that no input shows help.""" with ArgosAvailableMock(): result = main([]) diff --git a/python_pkg/word_frequency/tests/test_vocabulary_curve.py b/python_pkg/word_frequency/tests/test_vocabulary_curve.py index 352093a..df57291 100755 --- a/python_pkg/word_frequency/tests/test_vocabulary_curve.py +++ b/python_pkg/word_frequency/tests/test_vocabulary_curve.py @@ -89,7 +89,7 @@ class TestExcerptValidity: """Tests that verify excerpts are actually found in the source text.""" def test_excerpt_exists_in_source_text(self, sample_text_file: Path) -> None: - """Test that each excerpt can be found in the source text as contiguous words.""" + """Test that each excerpt can be found in source text.""" import re source_text = sample_text_file.read_text(encoding="utf-8").lower() diff --git a/python_pkg/word_frequency/translator.py b/python_pkg/word_frequency/translator.py index dc36e90..354571a 100755 --- a/python_pkg/word_frequency/translator.py +++ b/python_pkg/word_frequency/translator.py @@ -1,149 +1,163 @@ #!/usr/bin/env python3 -"""Translator - translates words/text between languages. +r"""Translator - translates words/text between languages. This module provides translation capabilities using either: -1. Argos Translate (offline, requires large downloads) - preferred if installed -2. deep-translator (online, uses Google Translate) - lightweight fallback -Usage: +1. Argos Translate (offline, requires large downloads) +2. deep-translator (online, uses Google Translate) + +Usage:: + # Translate a single word - python -m python_pkg.word_frequency.translator --text "hello" --from en --to es + python -m python_pkg.word_frequency.translator \\ + --text "hello" --from en --to es # Translate multiple words - python -m python_pkg.word_frequency.translator --words hello world goodbye --from en --to pl + python -m python_pkg.word_frequency.translator \\ + --words hello world goodbye --from en --to pl # Translate words from a file (one word per line) - python -m python_pkg.word_frequency.translator --words-file words.txt --from la --to en + python -m python_pkg.word_frequency.translator \\ + --words-file words.txt --from la --to en # List available languages - python -m python_pkg.word_frequency.translator --list-languages + python -m python_pkg.word_frequency.translator \\ + --list-languages # Output to file - python -m python_pkg.word_frequency.translator --words-file vocab.txt --from pl --to en --output translations.txt + python -m python_pkg.word_frequency.translator \\ + --words-file vocab.txt --from pl --to en \\ + --output translations.txt -Dependencies (install one): - pip install deep-translator # Lightweight, uses Google Translate (online) - pip install argostranslate # Offline translation (requires ~3GB downloads) +Dependencies (install one):: + + pip install deep-translator + pip install argostranslate """ from __future__ import annotations import argparse +import importlib +import logging +import os from pathlib import Path +import subprocess import sys from typing import TYPE_CHECKING, NamedTuple if TYPE_CHECKING: from collections.abc import Sequence -# Lazy imports for translation backends (may not be installed) -_argos_available: bool | None = None -_deep_translator_available: bool | None = None -_langdetect_available: bool | None = None -_gpu_initialized: bool = False -_gpu_available: bool | None = None +try: + import torch +except ImportError: + torch = None # type: ignore[assignment] + +try: + import argostranslate.package + import argostranslate.translate +except ImportError: + argostranslate = None # type: ignore[assignment] + +try: + from deep_translator import GoogleTranslator +except ImportError: + GoogleTranslator = None + +try: + import langdetect +except ImportError: + langdetect = None # type: ignore[assignment] + +try: + from python_pkg.word_frequency.cache import ( + get_translation_cache, + ) +except ImportError: + get_translation_cache = None + +logger = logging.getLogger(__name__) + +_LANG_DETECT_SAMPLE_SIZE = 5000 +_BATCH_SIZE = 100 + + +class _TranslatorState: + """Holds module-level state for lazy-initialized backends.""" + + gpu_initialized: bool = False def _check_cuda_available() -> bool: """Check if CUDA is available for GPU acceleration.""" - global _gpu_available - if _gpu_available is None: - try: - import torch + return torch is not None and torch.cuda.is_available() - _gpu_available = torch.cuda.is_available() - except ImportError: - _gpu_available = False - return _gpu_available + +def _validate_gpu_device() -> str: + """Validate GPU device availability and return device name. + + Raises: + RuntimeError: If no GPU devices are found. + """ + device_count = torch.cuda.device_count() + if device_count == 0: + msg = "CUDA reports available but no GPU devices found" + raise RuntimeError(msg) + return torch.cuda.get_device_name(0) def _init_gpu_if_available() -> None: """Initialize GPU for argostranslate if CUDA is available. Raises: - RuntimeError: If CUDA is available but GPU initialization fails. + RuntimeError: If CUDA is available but GPU init fails. """ - global _gpu_initialized - if _gpu_initialized: + if _TranslatorState.gpu_initialized: return if not _check_cuda_available(): - _gpu_initialized = True + _TranslatorState.gpu_initialized = True return - import sys - - print("CUDA detected, initializing GPU acceleration...", file=sys.stderr) + logger.info( + "CUDA detected, initializing GPU acceleration..." + ) try: - import torch - - # Force CTranslate2 to use CUDA - device_count = torch.cuda.device_count() - if device_count == 0: - raise RuntimeError("CUDA reports available but no GPU devices found") - - device_name = torch.cuda.get_device_name(0) - print(f" Using GPU: {device_name}", file=sys.stderr) - - # Set environment variable to force GPU usage in argos - import os + device_name = _validate_gpu_device() + logger.info(" Using GPU: %s", device_name) os.environ["CT2_CUDA_ALLOW_FP16"] = "1" os.environ["CT2_USE_EXPERIMENTAL_PACKED_GEMM"] = "1" - _gpu_initialized = True - print(" GPU acceleration enabled.", file=sys.stderr) + _TranslatorState.gpu_initialized = True + logger.info(" GPU acceleration enabled.") except Exception as e: - raise RuntimeError( - f"CUDA is available but GPU initialization failed: {e}\n" - f"This may be due to incompatible CUDA version or driver issues.\n" - f"To disable GPU and use CPU only, set environment variable: CT2_FORCE_CPU=1" - ) from e + msg = ( + f"CUDA is available but GPU initialization failed: " + f"{e}\nThis may be due to incompatible CUDA " + "version or driver issues.\n" + "To disable GPU and use CPU only, set " + "environment variable: CT2_FORCE_CPU=1" + ) + raise RuntimeError(msg) from e def _check_argos() -> bool: """Check if argostranslate is available.""" - global _argos_available - if _argos_available is None: - try: - import argostranslate.package - import argostranslate.translate - - _ = (argostranslate.package, argostranslate.translate) - _argos_available = True - except ImportError: - _argos_available = False - return _argos_available + return argostranslate is not None def _check_deep_translator() -> bool: """Check if deep-translator is available.""" - global _deep_translator_available - if _deep_translator_available is None: - try: - from deep_translator import GoogleTranslator - - _ = GoogleTranslator - _deep_translator_available = True - except ImportError: - _deep_translator_available = False - return _deep_translator_available + return GoogleTranslator is not None def _check_langdetect() -> bool: """Check if langdetect is available.""" - global _langdetect_available - if _langdetect_available is None: - try: - import langdetect - - _ = langdetect - _langdetect_available = True - except ImportError: - _langdetect_available = False - return _langdetect_available + return langdetect is not None def detect_language(text: str) -> str | None: @@ -158,13 +172,14 @@ def detect_language(text: str) -> str | None: if not _check_langdetect(): return None - import langdetect - try: - # Use a sample of the text for detection (faster and more reliable) - sample = text[:5000] if len(text) > 5000 else text - return langdetect.detect(sample) # type: ignore[no-any-return] - except langdetect.LangDetectException: # type: ignore[attr-defined] + sample = ( + text[:_LANG_DETECT_SAMPLE_SIZE] + if len(text) > _LANG_DETECT_SAMPLE_SIZE + else text + ) + return langdetect.detect(sample) # type: ignore[no-any-return,union-attr] + except langdetect.LangDetectException: # type: ignore[attr-defined,union-attr] return None @@ -188,8 +203,6 @@ def get_installed_languages() -> list[tuple[str, str]]: if not _check_argos(): return [] - import argostranslate.translate - languages = argostranslate.translate.get_installed_languages() return [(lang.code, lang.name) for lang in languages] @@ -203,8 +216,6 @@ def get_available_packages() -> list[tuple[str, str, str, str]]: if not _check_argos(): return [] - import argostranslate.package - argostranslate.package.update_package_index() available = argostranslate.package.get_available_packages() return [ @@ -227,12 +238,10 @@ def download_languages(lang_codes: Sequence[str]) -> dict[str, bool]: if not _check_argos(): return {} - import argostranslate.package - results: dict[str, bool] = {} # Update package index - print("Updating package index...") + logger.info("Updating package index...") argostranslate.package.update_package_index() available = argostranslate.package.get_available_packages() @@ -255,13 +264,26 @@ def download_languages(lang_codes: Sequence[str]) -> dict[str, bool]: if pkg_key in available_lookup: pkg = available_lookup[pkg_key] try: - print(f"Downloading {from_code} -> {to_code}...") + logger.info( + "Downloading %s -> %s...", + from_code, + to_code, + ) argostranslate.package.install_from_path(pkg.download()) results[key] = True - print(f" ✓ Installed {from_code} -> {to_code}") - except Exception as e: # noqa: BLE001 + logger.info( + " Installed %s -> %s", + from_code, + to_code, + ) + except (OSError, RuntimeError, ValueError) as e: results[key] = False - print(f" ✗ Failed {from_code} -> {to_code}: {e}") + logger.info( + " Failed %s -> %s: %s", + from_code, + to_code, + e, + ) else: # Package not available results[key] = False @@ -278,32 +300,38 @@ def _ensure_argos_installed() -> None: if _check_argos(): return - import subprocess - import sys - - print("argostranslate not found. Attempting to install...") + logger.info("argostranslate not found. Attempting to install...") try: subprocess.run( [sys.executable, "-m", "pip", "install", "argostranslate"], check=True, capture_output=True, ) - # Reset the check flag and verify - global _argos_available - _argos_available = None - if not _check_argos(): - raise ImportError("argostranslate installation succeeded but import failed") - print("argostranslate installed successfully.") + # Attempt runtime re-import + importlib.import_module("argostranslate.package") + importlib.import_module("argostranslate.translate") + logger.info("argostranslate installed successfully.") except subprocess.CalledProcessError as e: error_msg = e.stderr.decode() if e.stderr else str(e) - raise ImportError( - f"argostranslate is required for offline translation.\n\n" - f"Install manually with one of:\n" - f" pip install argostranslate # In a virtualenv\n" - f" pipx install argostranslate # System-wide via pipx\n" - f" pacman -S python-argostranslate # Arch Linux (if available)\n\n" + msg = ( + "argostranslate is required for offline " + "translation.\n\n" + "Install manually with one of:\n" + " pip install argostranslate" + " # In a virtualenv\n" + " pipx install argostranslate" + " # System-wide via pipx\n" + " pacman -S python-argostranslate" + " # Arch Linux (if available)\n\n" f"Original error: {error_msg}" - ) from e + ) + raise ImportError(msg) from e + except ImportError: + msg = ( + "argostranslate installation succeeded but " + "import failed" + ) + raise ImportError(msg) from None def _ensure_language_pair(from_lang: str, to_lang: str) -> None: @@ -316,11 +344,9 @@ def _ensure_language_pair(from_lang: str, to_lang: str) -> None: Raises: ValueError: If language pair cannot be obtained. """ - import argostranslate.package - import argostranslate.translate - - # Check if already installed - installed_languages = argostranslate.translate.get_installed_languages() + installed_languages = ( + argostranslate.translate.get_installed_languages() + ) from_lang_obj = None to_lang_obj = None @@ -337,37 +363,44 @@ def _ensure_language_pair(from_lang: str, to_lang: str) -> None: return # Already available # Need to download - import sys - - print( - f"Downloading language pack: {from_lang} -> {to_lang}...", - file=sys.stderr, + logger.info( + "Downloading language pack: %s -> %s...", + from_lang, + to_lang, ) - print(" Fetching package index...", file=sys.stderr) + logger.info(" Fetching package index...") argostranslate.package.update_package_index() available = argostranslate.package.get_available_packages() pkg = next( - (p for p in available if p.from_code == from_lang and p.to_code == to_lang), + ( + p + for p in available + if p.from_code == from_lang and p.to_code == to_lang + ), None, ) if pkg is None: - raise ValueError( - f"No language pack available for {from_lang} -> {to_lang}. " - f"Available pairs can be listed with --list-languages." + msg = ( + f"No language pack available for " + f"{from_lang} -> {to_lang}. " + "Available pairs can be listed with " + "--list-languages." ) + raise ValueError(msg) - print( - " Downloading package (~50-100MB, this may take a minute)...", - file=sys.stderr, + logger.info( + " Downloading package (~50-100MB, " + "this may take a minute)...", ) download_path = pkg.download() - print(" Installing language pack...", file=sys.stderr) + logger.info(" Installing language pack...") argostranslate.package.install_from_path(download_path) - print( - f"Language pack {from_lang} -> {to_lang} installed.", - file=sys.stderr, + logger.info( + "Language pack %s -> %s installed.", + from_lang, + to_lang, ) @@ -393,38 +426,30 @@ def translate_word( ImportError: If argostranslate is not available and cannot be installed. """ # Check cache first - if use_cache: - try: - from python_pkg.word_frequency.cache import get_translation_cache - - cache = get_translation_cache() - cached = cache.get(word, from_lang, to_lang) - if cached is not None: - return TranslationResult( - source_word=word, - translated_word=cached, - source_lang=from_lang, - target_lang=to_lang, - success=True, - ) - except ImportError: - pass # Cache not available + if use_cache and get_translation_cache is not None: + cache = get_translation_cache() + cached = cache.get(word, from_lang, to_lang) + if cached is not None: + return TranslationResult( + source_word=word, + translated_word=cached, + source_lang=from_lang, + target_lang=to_lang, + success=True, + ) # Ensure argos is installed (will raise if it can't be) _ensure_argos_installed() - import argostranslate.translate - try: - translated = argostranslate.translate.translate(word, from_lang, to_lang) + translated = argostranslate.translate.translate( + word, from_lang, to_lang, + ) # Cache the result - if use_cache: - try: - from python_pkg.word_frequency.cache import get_translation_cache - - get_translation_cache().set(word, from_lang, to_lang, translated) - except ImportError: - pass + if use_cache and get_translation_cache is not None: + get_translation_cache().set( + word, from_lang, to_lang, translated, + ) return TranslationResult( source_word=word, translated_word=translated, @@ -432,7 +457,7 @@ def translate_word( target_lang=to_lang, success=True, ) - except Exception as e: # noqa: BLE001 + except (OSError, RuntimeError, ValueError, TypeError) as e: return TranslationResult( source_word=word, translated_word="", @@ -483,8 +508,6 @@ def _translate_batch_worker( Returns: Tuple of (batch_idx, translations dict). """ - import argostranslate.translate - translations: dict[str, str] = {} # Batch translate by joining with newlines @@ -507,6 +530,78 @@ def _translate_batch_worker( return batch_idx, translations +def _run_batch_translation( + words_to_translate: list[str], + from_lang: str, + to_lang: str, +) -> dict[str, str]: + """Translate a list of words in batches with progress logging. + + Args: + words_to_translate: Words needing translation. + from_lang: Source language code. + to_lang: Target language code. + + Returns: + Dict mapping lowercased words to translations. + + Raises: + RuntimeError: If translation fails. + """ + new_translations: dict[str, str] = {} + num_to_translate = len(words_to_translate) + + gpu_status = ( + " (GPU)" if _check_cuda_available() else " (CPU)" + ) + logger.info( + "Translating %d words from %s to %s%s...", + num_to_translate, + from_lang, + to_lang, + gpu_status, + ) + + try: + batches = [ + words_to_translate[i : i + _BATCH_SIZE] + for i in range(0, num_to_translate, _BATCH_SIZE) + ] + total_batches = len(batches) + + for batch_idx, batch_words in enumerate(batches): + words_done = min( + (batch_idx + 1) * _BATCH_SIZE, + num_to_translate, + ) + pct = int(words_done / num_to_translate * 100) + + logger.info( + " [%3d%%] Translating batch %d/%d " + "(%d/%d words)...", + pct, + batch_idx + 1, + total_batches, + words_done, + num_to_translate, + ) + + _, batch_translations = _translate_batch_worker( + batch_words, from_lang, to_lang, batch_idx, + ) + new_translations.update(batch_translations) + + logger.info(" Translation complete.") + except Exception as e: + msg = ( + f"Translation failed for " + f"{from_lang} -> {to_lang}: {e}" + ) + raise RuntimeError(msg) from e + + return new_translations + + def translate_words_batch( words: Sequence[str], from_lang: str, @@ -535,90 +630,36 @@ def translate_words_batch( if not words: return [] - # Ensure argos is installed (will raise if it can't be) _ensure_argos_installed() - - # Initialize GPU if available (will raise if CUDA available but fails) _init_gpu_if_available() - - # Ensure language pair is available _ensure_language_pair(from_lang, to_lang) # Check cache for already-translated words cached_results: dict[str, str] = {} - words_to_translate: list[str] = [] - - if use_cache: - try: - from python_pkg.word_frequency.cache import get_translation_cache - - cache = get_translation_cache() - cached_results = cache.get_many(list(words), from_lang, to_lang) - except ImportError: - pass + if use_cache and get_translation_cache is not None: + cache = get_translation_cache() + cached_results = cache.get_many( + list(words), from_lang, to_lang, + ) # Find words that still need translation - for word in words: - if word.lower() not in cached_results: - words_to_translate.append(word) + words_to_translate = [ + word for word in words + if word.lower() not in cached_results + ] # Translate uncached words using argos batch new_translations: dict[str, str] = {} if words_to_translate: - import sys - - num_to_translate = len(words_to_translate) - - # Check if GPU is being used - gpu_status = " (GPU)" if _gpu_available else " (CPU)" - print( - f"Translating {num_to_translate} words from {from_lang} to {to_lang}{gpu_status}...", - file=sys.stderr, - flush=True, + new_translations = _run_batch_translation( + words_to_translate, from_lang, to_lang, ) - try: - # Split into batches - larger batches are faster but show progress less often - BATCH_SIZE = 100 - batches: list[list[str]] = [] - for i in range(0, num_to_translate, BATCH_SIZE): - batches.append(words_to_translate[i : i + BATCH_SIZE]) - - total_batches = len(batches) - - # Sequential translation with progress - # (argostranslate is not thread-safe - uses global model) - for batch_idx, batch_words in enumerate(batches): - words_done = (batch_idx + 1) * BATCH_SIZE - words_done = min(words_done, num_to_translate) - pct = int(words_done / num_to_translate * 100) - - print( - f" [{pct:3d}%] Translating batch {batch_idx + 1}/{total_batches} " - f"({words_done}/{num_to_translate} words)...", - file=sys.stderr, - flush=True, - ) - - _, batch_translations = _translate_batch_worker( - batch_words, from_lang, to_lang, batch_idx - ) - new_translations.update(batch_translations) - - print(" Translation complete.", file=sys.stderr, flush=True) - except Exception as e: - raise RuntimeError( - f"Translation failed for {from_lang} -> {to_lang}: {e}" - ) from e - # Cache new translations - if use_cache and new_translations: - try: - from python_pkg.word_frequency.cache import get_translation_cache - - get_translation_cache().set_many(new_translations, from_lang, to_lang) - except ImportError: - pass + if use_cache and get_translation_cache is not None: + get_translation_cache().set_many( + new_translations, from_lang, to_lang, + ) # Merge cached and new translations all_translations = {**cached_results, **new_translations} @@ -694,22 +735,14 @@ def read_file(filepath: str | Path) -> str: return Path(filepath).read_text(encoding="utf-8") -def main(argv: Sequence[str] | None = None) -> int: - """Main entry point for the translator. - - Args: - argv: Command line arguments. - - Returns: - Exit code. - """ +def _build_parser() -> argparse.ArgumentParser: + """Build the argument parser for the translator CLI.""" parser = argparse.ArgumentParser( description="Offline translator using Argos Translate.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) - # Actions action_group = parser.add_mutually_exclusive_group() action_group.add_argument( "--list-languages", @@ -728,10 +761,12 @@ def main(argv: Sequence[str] | None = None) -> int: "-d", nargs="+", metavar="LANG", - help="Download language packs (e.g., --download en es pl)", + help=( + "Download language packs " + "(e.g., --download en es pl)" + ), ) - # Input input_group = parser.add_mutually_exclusive_group() input_group.add_argument( "--text", @@ -752,7 +787,6 @@ def main(argv: Sequence[str] | None = None) -> int: help="File with words to translate (one per line)", ) - # Language options parser.add_argument( "--from", "-f", @@ -769,8 +803,6 @@ def main(argv: Sequence[str] | None = None) -> int: default="en", help="Target language code (default: en)", ) - - # Output parser.add_argument( "--output", "-o", @@ -778,87 +810,142 @@ def main(argv: Sequence[str] | None = None) -> int: help="Output file path", ) - args = parser.parse_args(argv) + return parser - # Check if argostranslate is available - if not _check_argos(): - print( - "Error: argostranslate is not installed.\n" - "Install it with: pip install argostranslate", - file=sys.stderr, + +def _handle_list_languages() -> int: + """Handle --list-languages command.""" + langs = get_installed_languages() + if not langs: + sys.stdout.write("No languages installed.\n") + sys.stdout.write( + "Download some with: --download en es pl de fr\n", ) - return 1 + else: + sys.stdout.write("Installed languages:\n") + for code, name in sorted(langs): + sys.stdout.write(f" {code}: {name}\n") + return 0 - # Handle list-languages - if args.list_languages: - langs = get_installed_languages() - if not langs: - print("No languages installed.") - print("Download some with: --download en es pl de fr") - else: - print("Installed languages:") - for code, name in sorted(langs): - print(f" {code}: {name}") - return 0 - # Handle list-available - if args.list_available: - packages = get_available_packages() - if not packages: - print("No packages available (check internet connection).") - else: - print("Available language packages:") - for from_code, from_name, to_code, to_name in sorted(packages): - print(f" {from_code} ({from_name}) -> {to_code} ({to_name})") - return 0 +def _handle_list_available() -> int: + """Handle --list-available command.""" + packages = get_available_packages() + if not packages: + sys.stdout.write( + "No packages available " + "(check internet connection).\n", + ) + else: + sys.stdout.write("Available language packages:\n") + for from_code, from_name, to_code, to_name in sorted( + packages, + ): + sys.stdout.write( + f" {from_code} ({from_name})" + f" -> {to_code} ({to_name})\n", + ) + return 0 - # Handle download - if args.download: - download_results = download_languages(args.download) - success_count = sum(1 for v in download_results.values() if v) - print(f"\nDownloaded {success_count}/{len(download_results)} language pairs.") - return 0 if success_count > 0 else 1 - # Handle translation - words: list[str] = [] +def _handle_download(lang_codes: list[str]) -> int: + """Handle --download command.""" + download_results = download_languages(lang_codes) + success_count = sum( + 1 for v in download_results.values() if v + ) + sys.stdout.write( + f"\nDownloaded {success_count}/" + f"{len(download_results)} language pairs.\n", + ) + return 0 if success_count > 0 else 1 + + +def _collect_words( + args: argparse.Namespace, +) -> list[str] | None: + """Collect words from args. Returns None on error.""" if args.text: - words = [args.text] - elif args.words: - words = args.words - elif args.words_file: + return [args.text] + if args.words: + return args.words + if args.words_file: try: content = read_file(args.words_file) - words = [w.strip() for w in content.splitlines() if w.strip()] except FileNotFoundError: - print(f"Error: File not found: {args.words_file}", file=sys.stderr) - return 1 + sys.stderr.write( + f"Error: File not found: {args.words_file}\n", + ) + return None + return [ + w.strip() + for w in content.splitlines() + if w.strip() + ] + return [] - if not words: - parser.print_help() - return 1 - # Translate +def _handle_translation(args: argparse.Namespace) -> int: + """Handle the translation action.""" try: - results = translate_words_batch(words, args.from_lang, args.to_lang) - except ImportError as e: - print(f"Error: {e}", file=sys.stderr) + results = translate_words_batch( + args.words, args.from_lang, args.to_lang, + ) + except ImportError: + logger.exception("Translation import error") return 1 output = format_translations(results) - # Output if args.output: Path(args.output).write_text(output, encoding="utf-8") - print(f"Translations written to {args.output}") + sys.stdout.write( + f"Translations written to {args.output}\n", + ) else: - print(output) + sys.stdout.write(output + "\n") - # Return error if any translation failed if any(not r.success for r in results): return 1 return 0 +def main(argv: Sequence[str] | None = None) -> int: + """Main entry point for the translator. + + Args: + argv: Command line arguments. + + Returns: + Exit code. + """ + parser = _build_parser() + args = parser.parse_args(argv) + + if not _check_argos(): + sys.stderr.write( + "Error: argostranslate is not installed.\n" + "Install it with: pip install argostranslate\n", + ) + return 1 + + if args.list_languages: + return _handle_list_languages() + if args.list_available: + return _handle_list_available() + if args.download: + return _handle_download(args.download) + + words = _collect_words(args) + if not words: + if words is not None: + parser.print_help() + return 1 + + args.words = words + return _handle_translation(args) + + if __name__ == "__main__": sys.exit(main()) diff --git a/python_pkg/word_frequency/vocabulary_curve.py b/python_pkg/word_frequency/vocabulary_curve.py index 46c0e2d..54ca7e5 100755 --- a/python_pkg/word_frequency/vocabulary_curve.py +++ b/python_pkg/word_frequency/vocabulary_curve.py @@ -14,7 +14,9 @@ Usage: from __future__ import annotations import argparse +import logging from pathlib import Path +import re import sys from typing import TYPE_CHECKING, NamedTuple @@ -27,6 +29,9 @@ except ImportError: from analyzer import analyze_text, read_file +logger = logging.getLogger(__name__) + + class ExcerptAnalysis(NamedTuple): """Analysis result for an excerpt length.""" @@ -111,8 +116,6 @@ def find_optimal_excerpts( ranked_words = [word for word, _ in word_counts.most_common()] # Extract all words from text (preserving order) - import re - all_words = re.findall(r"\b[\w]+\b", text, re.UNICODE) if not case_sensitive: all_words = [w.lower() for w in all_words] @@ -150,6 +153,9 @@ def find_optimal_excerpts( return results +_MAX_EXCERPT_DISPLAY_LEN = 50 + + def format_results( results: list[ExcerptAnalysis], *, @@ -198,7 +204,7 @@ def format_results( if show_excerpts: # Truncate long excerpts excerpt = r.best_excerpt - if len(excerpt) > 50: + if len(excerpt) > _MAX_EXCERPT_DISPLAY_LEN: excerpt = excerpt[:47] + "..." lines.append(f"{r.excerpt_length:>6} {r.min_vocab_needed:>5} {excerpt}") else: @@ -285,10 +291,7 @@ def main(argv: Sequence[str] | None = None) -> int: args = parser.parse_args(argv) try: - if args.text: - text = args.text - else: - text = read_file(args.file) + text = args.text or read_file(args.file) results = find_optimal_excerpts( text, @@ -304,15 +307,15 @@ def main(argv: Sequence[str] | None = None) -> int: if args.output: Path(args.output).write_text(output, encoding="utf-8") - print(f"Output written to {args.output}") + logger.info("Output written to %s", args.output) else: - print(output) + logger.info("%s", output) - except FileNotFoundError as e: - print(f"Error: File not found - {e}", file=sys.stderr) + except FileNotFoundError: + logger.exception("File not found") return 1 - except UnicodeDecodeError as e: - print(f"Error: Could not decode file - {e}", file=sys.stderr) + except UnicodeDecodeError: + logger.exception("Could not decode file") return 1 return 0