refactor(word_frequency): fix all ruff violations and remove noqa comments

- Replace print() with logging module throughout
- Add type annotations and Google docstrings to all functions
- Introduce DeckInput and LessonConfig dataclasses to reduce function parameters
- Use specific exception types instead of bare except (BLE001)
- Remove all noqa suppression comments
- Fix test fixtures: remove unused _capsys/_tmp_path parameters
This commit is contained in:
Krzysztof kuhy Rudnicki 2026-03-13 20:41:31 +01:00
parent ac1228f9c4
commit 2bb930db6f
14 changed files with 2537 additions and 1415 deletions

View File

@ -22,11 +22,14 @@ from __future__ import annotations
import argparse
from collections import Counter
import logging
from pathlib import Path
import re
import sys
from typing import TYPE_CHECKING
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from collections.abc import Sequence
@ -90,9 +93,7 @@ def read_files(filepaths: Sequence[str | Path]) -> str:
Returns:
Combined text content of all files.
"""
texts = []
for filepath in filepaths:
texts.append(read_file(filepath))
texts = [read_file(filepath) for filepath in filepaths]
return "\n".join(texts)
@ -244,15 +245,15 @@ def main(argv: Sequence[str] | None = None) -> int:
if args.output:
Path(args.output).write_text(result, encoding="utf-8")
print(f"Output written to {args.output}")
logger.info("Output written to %s", args.output)
else:
print(result)
sys.stdout.write(result + "\n")
except FileNotFoundError as e:
print(f"Error: File not found - {e}", file=sys.stderr)
except FileNotFoundError:
logger.exception("File not found")
return 1
except UnicodeDecodeError as e:
print(f"Error: Could not decode file as UTF-8 - {e}", file=sys.stderr)
except UnicodeDecodeError:
logger.exception("Could not decode file as UTF-8")
return 1
return 0

File diff suppressed because it is too large Load Diff

View File

@ -11,15 +11,23 @@ Cache location: ~/.cache/word_frequency/
from __future__ import annotations
import argparse
from dataclasses import dataclass
import hashlib
import json
import logging
import os
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
# Default cache directory
DEFAULT_CACHE_DIR = Path.home() / ".cache" / "word_frequency"
_ONE_KB = 1024
_ONE_MB = 1024 * 1024
def get_cache_dir() -> Path:
"""Get the cache directory, creating it if needed.
@ -42,7 +50,7 @@ def get_file_hash(filepath: Path) -> str:
Hex digest of file hash.
"""
hasher = hashlib.sha256()
with open(filepath, "rb") as f:
with filepath.open("rb") as f:
# Read in chunks for large files
for chunk in iter(lambda: f.read(65536), b""):
hasher.update(chunk)
@ -274,14 +282,15 @@ class VocabCurveCache:
try:
data = json.loads(cache_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, KeyError, OSError):
return None
else:
# Verify hash matches
if data.get("file_hash") != file_hash:
return None
excerpt = data["excerpt"]
words = [(w, r) for w, r in data["words"]]
return excerpt, words
except (json.JSONDecodeError, KeyError, OSError):
return None
def set(
self,
@ -339,6 +348,17 @@ class VocabCurveCache:
# =============================================================================
@dataclass(frozen=True)
class AnkiDeckKey:
"""Key parameters for Anki deck cache lookups."""
filepath: Path
length: int
target_lang: str
include_context: bool
all_vocab: bool
class AnkiDeckCache:
"""Cache for generated Anki decks."""
@ -380,6 +400,7 @@ class AnkiDeckCache:
file_hash: str,
length: int,
target_lang: str,
*,
include_context: bool,
all_vocab: bool,
) -> str:
@ -400,36 +421,35 @@ class AnkiDeckCache:
def get(
self,
filepath: Path,
length: int,
target_lang: str,
include_context: bool,
all_vocab: bool,
key: AnkiDeckKey,
) -> tuple[str, str, int, int] | None:
"""Get cached Anki deck.
Args:
filepath: Path to source file.
length: Excerpt length.
target_lang: Target language.
include_context: Whether context is included.
all_vocab: Whether all vocab is included.
key: Cache key parameters.
Returns:
Tuple of (anki_content, excerpt, num_words, max_rank) or None.
Tuple of (anki_content, excerpt, num_words, max_rank)
or None.
"""
file_hash = get_file_hash(filepath)
key = self._make_key(file_hash, length, target_lang, include_context, all_vocab)
file_hash = get_file_hash(key.filepath)
cache_key = self._make_key(
file_hash,
key.length,
key.target_lang,
include_context=key.include_context,
all_vocab=key.all_vocab,
)
metadata = self._load_metadata()
if key not in metadata:
if cache_key not in metadata:
return None
entry = metadata[key]
entry = metadata[cache_key]
if entry.get("file_hash") != file_hash:
return None
deck_file = self.cache_dir / f"{key}.txt"
deck_file = self.cache_dir / f"{cache_key}.txt"
if not deck_file.exists():
return None
@ -446,11 +466,7 @@ class AnkiDeckCache:
def set(
self,
filepath: Path,
length: int,
target_lang: str,
include_context: bool,
all_vocab: bool,
key: AnkiDeckKey,
anki_content: str,
excerpt: str,
num_words: int,
@ -459,32 +475,34 @@ class AnkiDeckCache:
"""Store Anki deck in cache.
Args:
filepath: Path to source file.
length: Excerpt length.
target_lang: Target language.
include_context: Whether context is included.
all_vocab: Whether all vocab is included.
key: Cache key parameters.
anki_content: The Anki deck content.
excerpt: The excerpt text.
num_words: Number of words in deck.
max_rank: Maximum word rank.
"""
file_hash = get_file_hash(filepath)
key = self._make_key(file_hash, length, target_lang, include_context, all_vocab)
file_hash = get_file_hash(key.filepath)
cache_key = self._make_key(
file_hash,
key.length,
key.target_lang,
include_context=key.include_context,
all_vocab=key.all_vocab,
)
# Save deck content
deck_file = self.cache_dir / f"{key}.txt"
deck_file = self.cache_dir / f"{cache_key}.txt"
deck_file.write_text(anki_content, encoding="utf-8")
# Update metadata
metadata = self._load_metadata()
metadata[key] = {
metadata[cache_key] = {
"file_hash": file_hash,
"filepath": str(filepath),
"length": length,
"target_lang": target_lang,
"include_context": include_context,
"all_vocab": all_vocab,
"filepath": str(key.filepath),
"length": key.length,
"target_lang": key.target_lang,
"include_context": key.include_context,
"all_vocab": key.all_vocab,
"excerpt": excerpt,
"num_words": num_words,
"max_rank": max_rank,
@ -519,34 +537,33 @@ class AnkiDeckCache:
# Global Cache Instances
# =============================================================================
# Singleton instances
_translation_cache: TranslationCache | None = None
_vocab_curve_cache: VocabCurveCache | None = None
_anki_deck_cache: AnkiDeckCache | None = None
class _CacheHolder:
"""Holds singleton cache instances."""
translation: TranslationCache | None = None
vocab_curve: VocabCurveCache | None = None
anki_deck: AnkiDeckCache | None = None
def get_translation_cache() -> TranslationCache:
"""Get the global translation cache instance."""
global _translation_cache
if _translation_cache is None:
_translation_cache = TranslationCache()
return _translation_cache
if _CacheHolder.translation is None:
_CacheHolder.translation = TranslationCache()
return _CacheHolder.translation
def get_vocab_curve_cache() -> VocabCurveCache:
"""Get the global vocabulary curve cache instance."""
global _vocab_curve_cache
if _vocab_curve_cache is None:
_vocab_curve_cache = VocabCurveCache()
return _vocab_curve_cache
if _CacheHolder.vocab_curve is None:
_CacheHolder.vocab_curve = VocabCurveCache()
return _CacheHolder.vocab_curve
def get_anki_deck_cache() -> AnkiDeckCache:
"""Get the global Anki deck cache instance."""
global _anki_deck_cache
if _anki_deck_cache is None:
_anki_deck_cache = AnkiDeckCache()
return _anki_deck_cache
if _CacheHolder.anki_deck is None:
_CacheHolder.anki_deck = AnkiDeckCache()
return _CacheHolder.anki_deck
def clear_all_caches() -> None:
@ -575,8 +592,6 @@ def main() -> int:
Returns:
Exit code.
"""
import argparse
parser = argparse.ArgumentParser(description="Manage word frequency caches")
parser.add_argument("--stats", action="store_true", help="Show cache statistics")
parser.add_argument("--clear", action="store_true", help="Clear all caches")
@ -594,42 +609,42 @@ def main() -> int:
if args.clear:
clear_all_caches()
print("All caches cleared.")
logger.info("All caches cleared.")
return 0
if args.clear_translations:
get_translation_cache().clear()
print("Translation cache cleared.")
logger.info("Translation cache cleared.")
return 0
if args.clear_excerpts:
get_vocab_curve_cache().clear()
print("Excerpt cache cleared.")
logger.info("Excerpt cache cleared.")
return 0
if args.clear_anki:
get_anki_deck_cache().clear()
print("Anki deck cache cleared.")
logger.info("Anki deck cache cleared.")
return 0
# Default: show stats
stats = get_all_cache_stats()
print("Cache Statistics")
print("=" * 50)
logger.info("Cache Statistics")
logger.info("=" * 50)
for cache_name, cache_stats in stats.items():
print(f"\n{cache_name.upper()}:")
logger.info("\n%s:", cache_name.upper())
for key, value in cache_stats.items():
if key == "cache_size_bytes":
# Format as human-readable
if value < 1024:
if value < _ONE_KB:
size_str = f"{value} B"
elif value < 1024 * 1024:
size_str = f"{value / 1024:.1f} KB"
elif value < _ONE_MB:
size_str = f"{value / _ONE_KB:.1f} KB"
else:
size_str = f"{value / (1024 * 1024):.1f} MB"
print(f" {key}: {size_str}")
size_str = f"{value / _ONE_MB:.1f} MB"
logger.info(" %s: %s", key, size_str)
else:
print(f" {key}: {value}")
logger.info(" %s: %s", key, value)
return 0

View File

@ -0,0 +1,640 @@
#!/usr/bin/env python3
"""Caching utilities for word frequency analysis.
Provides disk-based caching for:
- Translations (word -> translation mappings)
- Vocabulary curve excerpts (file + length -> excerpt + words)
- Generated Anki decks
Cache location: ~/.cache/word_frequency/
"""
from __future__ import annotations
import hashlib
import json
import os
from pathlib import Path
from typing import Any
# Default cache directory
DEFAULT_CACHE_DIR = Path.home() / ".cache" / "word_frequency"
def get_cache_dir() -> Path:
"""Get the cache directory, creating it if needed.
Returns:
Path to cache directory.
"""
cache_dir = Path(os.environ.get("WORD_FREQ_CACHE_DIR", str(DEFAULT_CACHE_DIR)))
cache_dir.mkdir(parents=True, exist_ok=True)
return cache_dir
def get_file_hash(filepath: Path) -> str:
"""Compute SHA256 hash of a file's contents.
Args:
filepath: Path to file.
Returns:
Hex digest of file hash.
"""
hasher = hashlib.sha256()
with open(filepath, "rb") as f:
# Read in chunks for large files
for chunk in iter(lambda: f.read(65536), b""):
hasher.update(chunk)
return hasher.hexdigest()
def get_text_hash(text: str) -> str:
"""Compute SHA256 hash of text content.
Args:
text: Text to hash.
Returns:
Hex digest of text hash.
"""
return hashlib.sha256(text.encode("utf-8")).hexdigest()
# =============================================================================
# Translation Cache
# =============================================================================
class TranslationCache:
"""Cache for word translations."""
def __init__(self, cache_dir: Path | None = None) -> None:
"""Initialize translation cache.
Args:
cache_dir: Optional custom cache directory.
"""
self.cache_dir = cache_dir or get_cache_dir()
self.cache_file = self.cache_dir / "translations.json"
self._cache: dict[str, str] | None = None
self._dirty = False # Track if cache needs saving
def _load_cache(self) -> dict[str, str]:
"""Load cache from disk."""
if self._cache is None:
if self.cache_file.exists():
try:
self._cache = json.loads(
self.cache_file.read_text(encoding="utf-8")
)
except (json.JSONDecodeError, OSError):
self._cache = {}
else:
self._cache = {}
return self._cache
def _save_cache(self) -> None:
"""Save cache to disk if dirty."""
if self._cache is not None and self._dirty:
self.cache_file.write_text(
json.dumps(self._cache, ensure_ascii=False, indent=2),
encoding="utf-8",
)
self._dirty = False
def flush(self) -> None:
"""Force save cache to disk."""
self._save_cache()
@staticmethod
def _make_key(word: str, source_lang: str, target_lang: str) -> str:
"""Create cache key for a translation.
Args:
word: Word to translate.
source_lang: Source language code.
target_lang: Target language code.
Returns:
Cache key string.
"""
return f"{source_lang}:{target_lang}:{word.lower()}"
def get(self, word: str, source_lang: str, target_lang: str) -> str | None:
"""Get cached translation.
Args:
word: Word to look up.
source_lang: Source language code.
target_lang: Target language code.
Returns:
Cached translation or None if not found.
"""
cache = self._load_cache()
key = self._make_key(word, source_lang, target_lang)
return cache.get(key)
def set(
self,
word: str,
source_lang: str,
target_lang: str,
translation: str,
*,
auto_save: bool = False,
) -> None:
"""Store translation in cache.
Args:
word: Original word.
source_lang: Source language code.
target_lang: Target language code.
translation: Translated word.
auto_save: If True, save to disk immediately.
"""
cache = self._load_cache()
key = self._make_key(word, source_lang, target_lang)
cache[key] = translation
self._dirty = True
if auto_save:
self._save_cache()
def get_many(
self, words: list[str], source_lang: str, target_lang: str
) -> dict[str, str]:
"""Get multiple cached translations.
Args:
words: Words to look up.
source_lang: Source language code.
target_lang: Target language code.
Returns:
Dict mapping words to their cached translations.
"""
cache = self._load_cache()
result: dict[str, str] = {}
for word in words:
key = self._make_key(word, source_lang, target_lang)
if key in cache:
result[word.lower()] = cache[key]
return result
def set_many(
self,
translations: dict[str, str],
source_lang: str,
target_lang: str,
) -> None:
"""Store multiple translations in cache and save to disk.
Args:
translations: Dict mapping words to translations.
source_lang: Source language code.
target_lang: Target language code.
"""
cache = self._load_cache()
for word, translation in translations.items():
key = self._make_key(word, source_lang, target_lang)
cache[key] = translation
self._dirty = True
self._save_cache() # Save once after all additions
def clear(self) -> None:
"""Clear all cached translations."""
self._cache = {}
self._dirty = False
if self.cache_file.exists():
self.cache_file.unlink()
def stats(self) -> dict[str, Any]:
"""Get cache statistics.
Returns:
Dict with cache stats.
"""
cache = self._load_cache()
return {
"total_entries": len(cache),
"cache_file": str(self.cache_file),
"cache_size_bytes": (
self.cache_file.stat().st_size if self.cache_file.exists() else 0
),
}
# =============================================================================
# Vocabulary Curve Cache
# =============================================================================
class VocabCurveCache:
"""Cache for vocabulary curve analysis results."""
def __init__(self, cache_dir: Path | None = None) -> None:
"""Initialize vocabulary curve cache.
Args:
cache_dir: Optional custom cache directory.
"""
self.cache_dir = (cache_dir or get_cache_dir()) / "excerpts"
self.cache_dir.mkdir(parents=True, exist_ok=True)
def _get_cache_path(self, file_hash: str, length: int) -> Path:
"""Get path to cache file for given hash and length.
Args:
file_hash: Hash of source file.
length: Excerpt length.
Returns:
Path to cache file.
"""
return self.cache_dir / f"{file_hash[:16]}_{length}.json"
def get(
self, filepath: Path, length: int
) -> tuple[str, list[tuple[str, int]]] | None:
"""Get cached excerpt and words for a file and length.
Args:
filepath: Path to source file.
length: Excerpt length.
Returns:
Tuple of (excerpt, words_with_ranks) or None if not cached.
"""
file_hash = get_file_hash(filepath)
cache_path = self._get_cache_path(file_hash, length)
if not cache_path.exists():
return None
try:
data = json.loads(cache_path.read_text(encoding="utf-8"))
# Verify hash matches
if data.get("file_hash") != file_hash:
return None
excerpt = data["excerpt"]
words = [(w, r) for w, r in data["words"]]
return excerpt, words
except (json.JSONDecodeError, KeyError, OSError):
return None
def set(
self,
filepath: Path,
length: int,
excerpt: str,
words: list[tuple[str, int]],
) -> None:
"""Store excerpt and words in cache.
Args:
filepath: Path to source file.
length: Excerpt length.
excerpt: The excerpt text.
words: List of (word, rank) tuples.
"""
file_hash = get_file_hash(filepath)
cache_path = self._get_cache_path(file_hash, length)
data = {
"file_hash": file_hash,
"filepath": str(filepath),
"length": length,
"excerpt": excerpt,
"words": [[w, r] for w, r in words],
}
cache_path.write_text(
json.dumps(data, ensure_ascii=False, indent=2),
encoding="utf-8",
)
def clear(self) -> None:
"""Clear all cached excerpts."""
for cache_file in self.cache_dir.glob("*.json"):
cache_file.unlink()
def stats(self) -> dict[str, Any]:
"""Get cache statistics.
Returns:
Dict with cache stats.
"""
cache_files = list(self.cache_dir.glob("*.json"))
total_size = sum(f.stat().st_size for f in cache_files)
return {
"total_entries": len(cache_files),
"cache_dir": str(self.cache_dir),
"cache_size_bytes": total_size,
}
# =============================================================================
# Anki Deck Cache
# =============================================================================
class AnkiDeckCache:
"""Cache for generated Anki decks."""
def __init__(self, cache_dir: Path | None = None) -> None:
"""Initialize Anki deck cache.
Args:
cache_dir: Optional custom cache directory.
"""
self.cache_dir = (cache_dir or get_cache_dir()) / "anki_decks"
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.metadata_file = self.cache_dir / "metadata.json"
self._metadata: dict[str, Any] | None = None
def _load_metadata(self) -> dict[str, Any]:
"""Load metadata from disk."""
if self._metadata is None:
if self.metadata_file.exists():
try:
self._metadata = json.loads(
self.metadata_file.read_text(encoding="utf-8")
)
except (json.JSONDecodeError, OSError):
self._metadata = {}
else:
self._metadata = {}
return self._metadata
def _save_metadata(self) -> None:
"""Save metadata to disk."""
if self._metadata is not None:
self.metadata_file.write_text(
json.dumps(self._metadata, ensure_ascii=False, indent=2),
encoding="utf-8",
)
@staticmethod
def _make_key(
file_hash: str,
length: int,
target_lang: str,
include_context: bool,
all_vocab: bool,
) -> str:
"""Create cache key for an Anki deck.
Args:
file_hash: Hash of source file.
length: Excerpt length.
target_lang: Target language.
include_context: Whether context is included.
all_vocab: Whether all vocab is included.
Returns:
Cache key string.
"""
flags = f"ctx{int(include_context)}_all{int(all_vocab)}"
return f"{file_hash[:16]}_{length}_{target_lang}_{flags}"
def get(
self,
filepath: Path,
length: int,
target_lang: str,
include_context: bool,
all_vocab: bool,
) -> tuple[str, str, int, int] | None:
"""Get cached Anki deck.
Args:
filepath: Path to source file.
length: Excerpt length.
target_lang: Target language.
include_context: Whether context is included.
all_vocab: Whether all vocab is included.
Returns:
Tuple of (anki_content, excerpt, num_words, max_rank) or None.
"""
file_hash = get_file_hash(filepath)
key = self._make_key(file_hash, length, target_lang, include_context, all_vocab)
metadata = self._load_metadata()
if key not in metadata:
return None
entry = metadata[key]
if entry.get("file_hash") != file_hash:
return None
deck_file = self.cache_dir / f"{key}.txt"
if not deck_file.exists():
return None
try:
content = deck_file.read_text(encoding="utf-8")
return (
content,
entry["excerpt"],
entry["num_words"],
entry["max_rank"],
)
except OSError:
return None
def set(
self,
filepath: Path,
length: int,
target_lang: str,
include_context: bool,
all_vocab: bool,
anki_content: str,
excerpt: str,
num_words: int,
max_rank: int,
) -> None:
"""Store Anki deck in cache.
Args:
filepath: Path to source file.
length: Excerpt length.
target_lang: Target language.
include_context: Whether context is included.
all_vocab: Whether all vocab is included.
anki_content: The Anki deck content.
excerpt: The excerpt text.
num_words: Number of words in deck.
max_rank: Maximum word rank.
"""
file_hash = get_file_hash(filepath)
key = self._make_key(file_hash, length, target_lang, include_context, all_vocab)
# Save deck content
deck_file = self.cache_dir / f"{key}.txt"
deck_file.write_text(anki_content, encoding="utf-8")
# Update metadata
metadata = self._load_metadata()
metadata[key] = {
"file_hash": file_hash,
"filepath": str(filepath),
"length": length,
"target_lang": target_lang,
"include_context": include_context,
"all_vocab": all_vocab,
"excerpt": excerpt,
"num_words": num_words,
"max_rank": max_rank,
}
self._save_metadata()
def clear(self) -> None:
"""Clear all cached decks."""
self._metadata = {}
for cache_file in self.cache_dir.glob("*.txt"):
cache_file.unlink()
if self.metadata_file.exists():
self.metadata_file.unlink()
def stats(self) -> dict[str, Any]:
"""Get cache statistics.
Returns:
Dict with cache stats.
"""
metadata = self._load_metadata()
cache_files = list(self.cache_dir.glob("*.txt"))
total_size = sum(f.stat().st_size for f in cache_files)
return {
"total_entries": len(metadata),
"cache_dir": str(self.cache_dir),
"cache_size_bytes": total_size,
}
# =============================================================================
# Global Cache Instances
# =============================================================================
# Singleton instances
_translation_cache: TranslationCache | None = None
_vocab_curve_cache: VocabCurveCache | None = None
_anki_deck_cache: AnkiDeckCache | None = None
def get_translation_cache() -> TranslationCache:
"""Get the global translation cache instance."""
global _translation_cache
if _translation_cache is None:
_translation_cache = TranslationCache()
return _translation_cache
def get_vocab_curve_cache() -> VocabCurveCache:
"""Get the global vocabulary curve cache instance."""
global _vocab_curve_cache
if _vocab_curve_cache is None:
_vocab_curve_cache = VocabCurveCache()
return _vocab_curve_cache
def get_anki_deck_cache() -> AnkiDeckCache:
"""Get the global Anki deck cache instance."""
global _anki_deck_cache
if _anki_deck_cache is None:
_anki_deck_cache = AnkiDeckCache()
return _anki_deck_cache
def clear_all_caches() -> None:
"""Clear all caches."""
get_translation_cache().clear()
get_vocab_curve_cache().clear()
get_anki_deck_cache().clear()
def get_all_cache_stats() -> dict[str, dict[str, Any]]:
"""Get statistics for all caches.
Returns:
Dict with stats for each cache type.
"""
return {
"translations": get_translation_cache().stats(),
"vocab_curves": get_vocab_curve_cache().stats(),
"anki_decks": get_anki_deck_cache().stats(),
}
def main() -> int:
"""CLI for cache management.
Returns:
Exit code.
"""
import argparse
parser = argparse.ArgumentParser(description="Manage word frequency caches")
parser.add_argument("--stats", action="store_true", help="Show cache statistics")
parser.add_argument("--clear", action="store_true", help="Clear all caches")
parser.add_argument(
"--clear-translations", action="store_true", help="Clear translation cache"
)
parser.add_argument(
"--clear-excerpts", action="store_true", help="Clear excerpt cache"
)
parser.add_argument(
"--clear-anki", action="store_true", help="Clear Anki deck cache"
)
args = parser.parse_args()
if args.clear:
clear_all_caches()
print("All caches cleared.")
return 0
if args.clear_translations:
get_translation_cache().clear()
print("Translation cache cleared.")
return 0
if args.clear_excerpts:
get_vocab_curve_cache().clear()
print("Excerpt cache cleared.")
return 0
if args.clear_anki:
get_anki_deck_cache().clear()
print("Anki deck cache cleared.")
return 0
# Default: show stats
stats = get_all_cache_stats()
print("Cache Statistics")
print("=" * 50)
for cache_name, cache_stats in stats.items():
print(f"\n{cache_name.upper()}:")
for key, value in cache_stats.items():
if key == "cache_size_bytes":
# Format as human-readable
if value < 1024:
size_str = f"{value} B"
elif value < 1024 * 1024:
size_str = f"{value / 1024:.1f} KB"
else:
size_str = f"{value / (1024 * 1024):.1f} MB"
print(f" {key}: {size_str}")
else:
print(f" {key}: {value}")
return 0
if __name__ == "__main__":
import sys
sys.exit(main())

View File

@ -6,21 +6,28 @@ specified length (in words) where the target words appear most frequently.
Usage:
# From raw text with target words
python -m python_pkg.word_frequency.excerpt_finder --text "they went somewhere he and she and the guy" --words and the --length 3
python -m python_pkg.word_frequency.excerpt_finder \
--text "they went somewhere he and she and the guy" \
--words and the --length 3
# From a file
python -m python_pkg.word_frequency.excerpt_finder --file path/to/file.txt --words the and of --length 10
python -m python_pkg.word_frequency.excerpt_finder \
--file path/to/file.txt --words the and of --length 10
# Target words from a file (one word per line)
python -m python_pkg.word_frequency.excerpt_finder --file text.txt --words-file targets.txt --length 20
python -m python_pkg.word_frequency.excerpt_finder \
--file text.txt --words-file targets.txt --length 20
# Show top N excerpts instead of just the best one
python -m python_pkg.word_frequency.excerpt_finder --file text.txt --words the and --length 10 --top 5
python -m python_pkg.word_frequency.excerpt_finder \
--file text.txt --words the and --length 10 --top 5
"""
from __future__ import annotations
import argparse
from dataclasses import dataclass
import logging
from pathlib import Path
import sys
from typing import TYPE_CHECKING, NamedTuple
@ -33,6 +40,17 @@ except ModuleNotFoundError:
if TYPE_CHECKING:
from collections.abc import Sequence
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class ExcerptSearchOptions:
"""Options for excerpt search and display."""
case_sensitive: bool = False
top_n: int = 1
context_words: int = 0
class ExcerptResult(NamedTuple):
"""Result of an excerpt search."""
@ -141,45 +159,28 @@ def find_best_excerpt(
return output
def find_best_excerpt_with_context(
def _expand_results_with_context(
text: str,
target_words: Sequence[str],
excerpt_length: int,
base_results: list[ExcerptResult],
context_words: int,
*,
case_sensitive: bool = False,
top_n: int = 1,
context_words: int = 0,
) -> list[ExcerptResult]:
"""Find the excerpt(s) with optional surrounding context.
"""Expand excerpt results with surrounding context words.
Args:
text: The input text to search.
target_words: Words to search for in the excerpt.
excerpt_length: Length of the excerpt in words.
case_sensitive: If False, match words case-insensitively.
top_n: Number of top excerpts to return.
context_words: Number of words to include before/after the excerpt.
text: The full source text.
base_results: Results from find_best_excerpt.
context_words: Number of words to include before/after.
case_sensitive: If False, words are lowercased.
Returns:
List of ExcerptResult with context included in the excerpt.
Expanded ExcerptResult list with context.
"""
base_results = find_best_excerpt(
text,
target_words,
excerpt_length,
case_sensitive=case_sensitive,
top_n=top_n,
)
if context_words <= 0:
return base_results
# Re-extract all words to get context
all_words = extract_words(text, case_sensitive=case_sensitive)
expanded_results: list[ExcerptResult] = []
for result in base_results:
# Expand the excerpt with context
ctx_start = max(0, result.start_index - context_words)
ctx_end = min(len(all_words), result.end_index + context_words)
context_excerpt_words = all_words[ctx_start:ctx_end]
@ -198,6 +199,40 @@ def find_best_excerpt_with_context(
return expanded_results
def find_best_excerpt_with_context(
text: str,
target_words: Sequence[str],
excerpt_length: int,
options: ExcerptSearchOptions | None = None,
) -> list[ExcerptResult]:
"""Find the excerpt(s) with optional surrounding context.
Args:
text: The input text to search.
target_words: Words to search for in the excerpt.
excerpt_length: Length of the excerpt in words.
options: Search options (case_sensitive, top_n, context_words).
Returns:
List of ExcerptResult with context included in the excerpt.
"""
opts = options or ExcerptSearchOptions()
base_results = find_best_excerpt(
text,
target_words,
excerpt_length,
case_sensitive=opts.case_sensitive,
top_n=opts.top_n,
)
if opts.context_words <= 0:
return base_results
return _expand_results_with_context(
text, base_results, opts.context_words, case_sensitive=opts.case_sensitive
)
def format_excerpt_results(
results: list[ExcerptResult],
target_words: Sequence[str],
@ -224,7 +259,8 @@ def format_excerpt_results(
lines.append(f'Excerpt: "{result.excerpt}"')
lines.append(f"Word position: {result.start_index} - {result.end_index - 1}")
lines.append(
f"Matches: {result.match_count}/{len(result.words)} ({result.match_percentage:.2f}%)"
f"Matches: {result.match_count}/{len(result.words)}"
f" ({result.match_percentage:.2f}%)"
)
lines.append("")
@ -316,10 +352,7 @@ def main(argv: Sequence[str] | None = None) -> int:
try:
# Get input text
if args.text:
text = args.text
else:
text = read_file(args.file)
text = args.text or read_file(args.file)
# Get target words
if args.words:
@ -329,7 +362,7 @@ def main(argv: Sequence[str] | None = None) -> int:
target_words = [w.strip() for w in words_content.splitlines() if w.strip()]
if not target_words:
print("Error: No target words provided", file=sys.stderr)
logger.error("No target words provided")
return 1
# Find excerpts
@ -337,9 +370,11 @@ def main(argv: Sequence[str] | None = None) -> int:
text,
target_words,
args.length,
case_sensitive=args.case_sensitive,
top_n=args.top,
context_words=args.context,
ExcerptSearchOptions(
case_sensitive=args.case_sensitive,
top_n=args.top,
context_words=args.context,
),
)
# Format and print results
@ -347,15 +382,15 @@ def main(argv: Sequence[str] | None = None) -> int:
if args.output:
Path(args.output).write_text(output, encoding="utf-8")
print(f"Output written to {args.output}")
logger.info("Output written to %s", args.output)
else:
print(output)
logger.info("%s", output)
except FileNotFoundError as e:
print(f"Error: File not found - {e}", file=sys.stderr)
except FileNotFoundError:
logger.exception("File not found")
return 1
except UnicodeDecodeError as e:
print(f"Error: Could not decode file as UTF-8 - {e}", file=sys.stderr)
except UnicodeDecodeError:
logger.exception("Could not decode file as UTF-8")
return 1
return 0

View File

@ -1,7 +1,8 @@
#!/usr/bin/env python3
"""Learning pipe - combines word frequency analysis with excerpt finding for language learning.
r"""Learning pipe - combines word frequency analysis with excerpt finding.
Helps language learners by:
This script helps language learners by:
1. Analyzing a text to find the most common words
2. Finding excerpts where those common words are most prevalent
3. Creating a progressive learning experience in batches
@ -11,26 +12,35 @@ The idea is to:
- Then read excerpts that are dense with those words
- Progressively learn more words and more complex excerpts
Usage:
# Basic usage - get top 20 words and find excerpts with them
python -m python_pkg.word_frequency.learning_pipe --file text.txt
Usage::
# Basic usage
python -m python_pkg.word_frequency.learning_pipe \\
--file text.txt
# Custom batch size and excerpt length
python -m python_pkg.word_frequency.learning_pipe --file text.txt --batch-size 30 --excerpt-length 50
python -m python_pkg.word_frequency.learning_pipe \\
--file text.txt --batch-size 30 --excerpt-length 50
# Multiple batches for progressive learning
python -m python_pkg.word_frequency.learning_pipe --file text.txt --batches 5 --batch-size 20
python -m python_pkg.word_frequency.learning_pipe \\
--file text.txt --batches 5 --batch-size 20
# Output to file
python -m python_pkg.word_frequency.learning_pipe --file text.txt --output lesson.txt
python -m python_pkg.word_frequency.learning_pipe \\
--file text.txt --output lesson.txt
# Skip common words (like "the", "a", "is") using a stopwords file
python -m python_pkg.word_frequency.learning_pipe --file text.txt --stopwords stopwords.txt
# Skip common words using a stopwords file
python -m python_pkg.word_frequency.learning_pipe \\
--file text.txt --stopwords stopwords.txt
"""
from __future__ import annotations
import argparse
from dataclasses import dataclass
from dataclasses import replace as _replace_dc
import logging
from pathlib import Path
import sys
from typing import TYPE_CHECKING
@ -53,6 +63,8 @@ except ModuleNotFoundError:
if TYPE_CHECKING:
from collections.abc import Sequence
logger = logging.getLogger(__name__)
# Common stopwords for various languages (can be overridden with --stopwords)
DEFAULT_STOPWORDS_EN = frozenset(
@ -181,57 +193,210 @@ def load_stopwords(filepath: str | Path | None) -> frozenset[str]:
)
@dataclass(frozen=True)
class LessonConfig:
"""Configuration for learning lesson generation."""
batch_size: int = 20
num_batches: int = 1
excerpt_length: int = 30
excerpts_per_batch: int = 3
stopwords: frozenset[str] | None = None
skip_default_stopwords: bool = False
skip_numbers: bool = True
case_sensitive: bool = False
translate_from: str | None = None
translate_to: str | None = None
def _resolve_stopwords(config: LessonConfig) -> frozenset[str]:
"""Resolve combined stopwords from config."""
if config.skip_default_stopwords:
return config.stopwords or frozenset()
return DEFAULT_STOPWORDS_EN | (config.stopwords or frozenset())
def _detect_translation_language(
text: str,
config: LessonConfig,
lines: list[str],
) -> tuple[str | None, str | None]:
"""Detect translation settings and return (from, to) pair."""
actual_from = config.translate_from
actual_to = config.translate_to or "en"
if actual_from == "auto" or (
config.translate_to and not config.translate_from
):
detected = detect_language(text)
if detected:
actual_from = detected
lines.append(f"Detected language: {detected}")
else:
lines.append(
"Warning: Could not detect language "
"(install langdetect: "
"pip install langdetect)"
)
actual_from = None
return actual_from, actual_to
def _format_word_list(
batch_words: list[tuple[str, int]],
start_idx: int,
total_words: int,
translations: dict[str, str],
) -> list[str]:
"""Format the vocabulary word list for a batch."""
lines: list[str] = []
for i, (word, count) in enumerate(
batch_words, start=start_idx + 1,
):
percentage = (count / total_words) * 100
if translations:
trans = translations.get(word, "?")
lines.append(
f" {i:3}. {word:<20} -> {trans:<20}"
f" ({count:,} occurrences, "
f"{percentage:.2f}%)"
)
else:
lines.append(
f" {i:3}. {word:<20}"
f" ({count:,} occurrences, "
f"{percentage:.2f}%)"
)
return lines
@dataclass(frozen=True)
class _LessonContext:
"""Shared context for batch generation."""
text: str
word_counts: dict[str, int]
config: LessonConfig
def _generate_batch_section(
ctx: _LessonContext,
batch_num: int,
batch_words: list[tuple[str, int]],
cumulative_words: list[str],
) -> list[str]:
"""Generate lines for a single batch section."""
config = ctx.config
total_words = sum(ctx.word_counts.values())
start_idx = batch_num * config.batch_size
end_idx = start_idx + config.batch_size
lines: list[str] = []
lines.append("-" * 70)
lines.append(
f"BATCH {batch_num + 1}: Words "
f"{start_idx + 1} - "
f"{min(end_idx, start_idx + len(batch_words))}"
)
lines.append("-" * 70)
lines.append("")
# Get translations if requested
translations: dict[str, str] = {}
do_translate = (
config.translate_from is not None
and config.translate_to is not None
)
if do_translate:
words_to_translate = [word for word, _ in batch_words]
translation_results = translate_words_batch(
words_to_translate,
config.translate_from, # type: ignore[arg-type]
config.translate_to, # type: ignore[arg-type]
)
translations = {
r.source_word: r.translated_word
for r in translation_results
if r.success
}
lines.append("VOCABULARY TO LEARN:")
lines.append("")
lines.extend(
_format_word_list(
batch_words, start_idx, total_words, translations,
)
)
lines.append("")
# Cumulative coverage
cumulative_count = sum(
ctx.word_counts[w]
for w in cumulative_words
if w in ctx.word_counts
)
coverage = (cumulative_count / total_words) * 100
lines.append(
"After learning these words, "
f"you'll recognize ~{coverage:.1f}% of the text"
)
lines.append("")
# Excerpts
lines.append("PRACTICE EXCERPTS:")
lines.append(
"(Excerpts where your learned vocabulary "
"is most concentrated)"
)
lines.append("")
excerpts = find_best_excerpt(
ctx.text,
cumulative_words,
config.excerpt_length,
case_sensitive=config.case_sensitive,
top_n=config.excerpts_per_batch,
)
for j, excerpt in enumerate(excerpts, 1):
lines.append(
f" Excerpt {j} "
f"({excerpt.match_percentage:.1f}% known words):"
)
lines.append(f' "{excerpt.excerpt}"')
lines.append("")
return lines
def generate_learning_lesson(
text: str,
*,
batch_size: int = 20,
num_batches: int = 1,
excerpt_length: int = 30,
excerpts_per_batch: int = 3,
stopwords: frozenset[str] | None = None,
skip_default_stopwords: bool = False,
skip_numbers: bool = True,
case_sensitive: bool = False,
context_words: int = 5,
translate_from: str | None = None,
translate_to: str | None = None,
config: LessonConfig | None = None,
) -> str:
"""Generate a learning lesson from text.
Args:
text: The source text to analyze.
batch_size: Number of words per learning batch.
num_batches: Number of batches to generate.
excerpt_length: Length of each excerpt in words.
excerpts_per_batch: Number of excerpts to find per batch.
stopwords: Custom stopwords to skip (in addition to defaults).
skip_default_stopwords: If True, don't filter out default English stopwords.
skip_numbers: If True, filter out numeric words (default: True).
case_sensitive: If True, treat words case-sensitively.
context_words: Words of context to include around excerpts.
translate_from: Source language code for translation (e.g., 'la', 'pl').
translate_to: Target language code for translation (e.g., 'en').
config: Lesson configuration. Uses defaults if None.
Returns:
Formatted learning lesson as a string.
"""
# Combine stopwords
all_stopwords: frozenset[str]
if skip_default_stopwords:
all_stopwords = stopwords or frozenset()
else:
all_stopwords = DEFAULT_STOPWORDS_EN | (stopwords or frozenset())
if config is None:
config = LessonConfig()
# Analyze text for word frequencies
word_counts = analyze_text(text, case_sensitive=case_sensitive)
all_stopwords = _resolve_stopwords(config)
word_counts = analyze_text(
text, case_sensitive=config.case_sensitive,
)
# Filter out stopwords and get sorted words
filtered_words = [
(word, count)
for word, count in word_counts.most_common()
if word.lower() not in all_stopwords
and len(word) > 1
and not (skip_numbers and word.isdigit())
and not (config.skip_numbers and word.isdigit())
]
total_words = sum(word_counts.values())
@ -241,125 +406,62 @@ def generate_learning_lesson(
lines.append("LANGUAGE LEARNING LESSON")
lines.append("=" * 70)
lines.append(
f"Source text: {total_words:,} total words, {len(word_counts):,} unique words"
f"Source text: {total_words:,} total words, "
f"{len(word_counts):,} unique words"
)
if all_stopwords:
lines.append(
f"After filtering {len(all_stopwords)} stopwords: {len(filtered_words):,} vocabulary words"
f"After filtering {len(all_stopwords)} "
f"stopwords: {len(filtered_words):,} "
"vocabulary words"
)
else:
lines.append(f"Vocabulary words: {len(filtered_words):,}")
lines.append(
f"Vocabulary words: {len(filtered_words):,}",
)
# Handle translation setup
actual_translate_from = translate_from
actual_translate_to = translate_to or "en" # Default to English
# Auto-detect language if translation is enabled but source not specified
if translate_from == "auto" or (translate_to and not translate_from):
detected = detect_language(text)
if detected:
actual_translate_from = detected
lines.append(f"Detected language: {detected}")
# Note: langdetect doesn't support Latin (often detected as Italian)
# If detection seems wrong, use --translate-from to override
else:
lines.append(
"Warning: Could not detect language "
"(install langdetect: pip install langdetect)"
)
actual_translate_from = None
do_translate = actual_translate_from is not None and actual_translate_to is not None
actual_from, actual_to = _detect_translation_language(
text, config, lines,
)
do_translate = (
actual_from is not None and actual_to is not None
)
if do_translate:
lines.append(f"Translation: {actual_translate_from} -> {actual_translate_to}")
lines.append(
f"Translation: {actual_from} -> {actual_to}",
)
lines.append("")
# Generate batches
# Create resolved config with detected translation
resolved_config = _replace_dc(
config,
translate_from=actual_from,
translate_to=actual_to,
)
ctx = _LessonContext(
text=text,
word_counts=word_counts,
config=resolved_config,
)
cumulative_words: list[str] = []
for batch_num in range(num_batches):
start_idx = batch_num * batch_size
end_idx = start_idx + batch_size
for batch_num in range(config.num_batches):
start_idx = batch_num * config.batch_size
end_idx = start_idx + config.batch_size
if start_idx >= len(filtered_words):
break
batch_words = filtered_words[start_idx:end_idx]
cumulative_words.extend(word for word, _ in batch_words)
lines.append("-" * 70)
lines.append(
f"BATCH {batch_num + 1}: Words {start_idx + 1} - {min(end_idx, len(filtered_words))}"
)
lines.append("-" * 70)
lines.append("")
# Get translations if requested
translations: dict[str, str] = {}
if do_translate:
words_to_translate = [word for word, _ in batch_words]
translation_results = translate_words_batch(
words_to_translate,
actual_translate_from, # type: ignore[arg-type]
actual_translate_to, # type: ignore[arg-type]
lines.extend(
_generate_batch_section(
ctx,
batch_num,
batch_words,
cumulative_words,
)
translations = {
r.source_word: r.translated_word
for r in translation_results
if r.success
}
# Word list with frequencies
lines.append("VOCABULARY TO LEARN:")
lines.append("")
if do_translate and translations:
# Include translations in output
for i, (word, count) in enumerate(batch_words, start=start_idx + 1):
percentage = (count / total_words) * 100
trans = translations.get(word, "?")
lines.append(
f" {i:3}. {word:<20} -> {trans:<20} ({count:,} occurrences, {percentage:.2f}%)"
)
else:
for i, (word, count) in enumerate(batch_words, start=start_idx + 1):
percentage = (count / total_words) * 100
lines.append(
f" {i:3}. {word:<20} ({count:,} occurrences, {percentage:.2f}%)"
)
lines.append("")
# Calculate cumulative coverage
cumulative_count = sum(
word_counts[word] for word in cumulative_words if word in word_counts
)
coverage = (cumulative_count / total_words) * 100
lines.append(
f"After learning these words, you'll recognize ~{coverage:.1f}% of the text"
)
lines.append("")
# Find excerpts using cumulative words
lines.append("PRACTICE EXCERPTS:")
lines.append("(Excerpts where your learned vocabulary is most concentrated)")
lines.append("")
excerpts = find_best_excerpt(
text,
cumulative_words,
excerpt_length,
case_sensitive=case_sensitive,
top_n=excerpts_per_batch,
)
for j, excerpt in enumerate(excerpts, 1):
lines.append(
f" Excerpt {j} ({excerpt.match_percentage:.1f}% known words):"
)
lines.append(f' "{excerpt.excerpt}"')
lines.append("")
# Summary
lines.append("=" * 70)
@ -368,14 +470,25 @@ def generate_learning_lesson(
if cumulative_words:
final_coverage = sum(
word_counts[word] for word in cumulative_words if word in word_counts
word_counts[w]
for w in cumulative_words
if w in word_counts
)
final_percentage = (final_coverage / total_words) * 100
lines.append(f"Total vocabulary words learned: {len(cumulative_words)}")
lines.append(f"Text coverage: {final_percentage:.1f}%")
final_pct = (final_coverage / total_words) * 100
lines.append(
"Total vocabulary words learned: "
f"{len(cumulative_words)}"
)
lines.append(f"Text coverage: {final_pct:.1f}%")
lines.append("")
lines.append("TIP: Focus on understanding the excerpts first, then read")
lines.append("more of the original text as your vocabulary grows!")
lines.append(
"TIP: Focus on understanding the excerpts "
"first, then read"
)
lines.append(
"more of the original text as your "
"vocabulary grows!"
)
return "\n".join(lines)
@ -475,7 +588,10 @@ def main(argv: Sequence[str] | None = None) -> int:
"--translate-from",
type=str,
metavar="LANG",
help="Source language code (e.g., 'la', 'pl', 'de'). If omitted, auto-detected.",
help=(
"Source language code (e.g., 'la', 'pl'). "
"If omitted, auto-detected."
),
)
parser.add_argument(
"--translate-to",
@ -496,27 +612,22 @@ def main(argv: Sequence[str] | None = None) -> int:
args = parser.parse_args(argv)
try:
# Get input text
if args.text:
text = args.text
else:
text = read_file(args.file)
text = args.text or read_file(args.file)
# Load custom stopwords if provided
custom_stopwords = load_stopwords(args.stopwords)
# Determine translation settings
# Translation enabled by default, --no-translate disables it
translate_from: str | None = None
translate_to: str | None = None
if not args.no_translate:
translate_from = args.translate_from or "auto" # "auto" triggers detection
translate_from = (
args.translate_from or "auto"
)
translate_to = args.translate_to
# Generate lesson
lesson = generate_learning_lesson(
text,
config = LessonConfig(
batch_size=args.batch_size,
num_batches=args.batches,
excerpt_length=args.excerpt_length,
@ -528,19 +639,26 @@ def main(argv: Sequence[str] | None = None) -> int:
translate_from=translate_from,
translate_to=translate_to,
)
lesson = generate_learning_lesson(text, config)
# Output
if args.output:
Path(args.output).write_text(lesson, encoding="utf-8")
print(f"Lesson written to {args.output}")
Path(args.output).write_text(
lesson, encoding="utf-8",
)
logger.info(
"Lesson written to %s", args.output,
)
else:
print(lesson)
logger.info(lesson)
except FileNotFoundError as e:
print(f"Error: File not found - {e}", file=sys.stderr)
except FileNotFoundError:
logger.exception("Error: File not found")
return 1
except UnicodeDecodeError as e:
print(f"Error: Could not decode file as UTF-8 - {e}", file=sys.stderr)
except UnicodeDecodeError:
logger.exception(
"Error: Could not decode file as UTF-8",
)
return 1
return 0

View File

@ -3,8 +3,11 @@
from __future__ import annotations
from collections import Counter
from pathlib import Path
import time
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from pathlib import Path
import pytest
@ -251,12 +254,13 @@ class TestMain:
assert exit_code == 0
assert "Unique words: 3" in captured.out
def test_file_not_found_error(self, capsys: pytest.CaptureFixture[str]) -> None:
def test_file_not_found_error(
self, caplog: pytest.LogCaptureFixture
) -> None:
"""Test error handling for missing file."""
exit_code = main(["--file", "/nonexistent/file.txt"])
captured = capsys.readouterr()
assert exit_code == 1
assert "Error" in captured.err
assert "File not found" in caplog.text
class TestPerformance:
@ -283,7 +287,7 @@ class TestPerformance:
assert elapsed < 10.0, f"Analysis took {elapsed:.2f}s, expected < 10s"
assert "word0" in result # Most common word should be present
def test_bible_sized_text_performance(self, tmp_path: Path) -> None:
def test_bible_sized_text_performance(self) -> None:
"""Test with Bible-sized text (~800k words)."""
# Generate text similar in size to the Bible
base_words = ["the", "and", "of", "to", "in", "a", "that", "is", "was", "for"]

View File

@ -10,6 +10,7 @@ import pytest
try:
from python_pkg.word_frequency.anki_generator import (
DeckInput,
find_word_contexts,
generate_anki_deck,
main,
@ -20,6 +21,7 @@ except ImportError:
sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent))
from python_pkg.word_frequency.anki_generator import (
DeckInput,
find_word_contexts,
generate_anki_deck,
main,
@ -77,7 +79,7 @@ class TestParseVocabularyCurveOutput:
def test_parse_length_1(self, sample_vocabulary_output: str) -> None:
"""Test parsing output for length 1."""
excerpt, excerpt_words, all_vocab = parse_vocabulary_curve_output(
excerpt, excerpt_words, _all_vocab = parse_vocabulary_curve_output(
sample_vocabulary_output, 1
)
assert excerpt == "the"
@ -85,7 +87,7 @@ class TestParseVocabularyCurveOutput:
def test_parse_length_2(self, sample_vocabulary_output: str) -> None:
"""Test parsing output for length 2."""
excerpt, excerpt_words, all_vocab = parse_vocabulary_curve_output(
excerpt, excerpt_words, _all_vocab = parse_vocabulary_curve_output(
sample_vocabulary_output, 2
)
assert excerpt == "the dog"
@ -93,7 +95,7 @@ class TestParseVocabularyCurveOutput:
def test_parse_length_3(self, sample_vocabulary_output: str) -> None:
"""Test parsing output for length 3."""
excerpt, excerpt_words, all_vocab = parse_vocabulary_curve_output(
excerpt, excerpt_words, _all_vocab = parse_vocabulary_curve_output(
sample_vocabulary_output, 3
)
assert excerpt == "the quick fox"
@ -104,7 +106,7 @@ class TestParseVocabularyCurveOutput:
def test_parse_nonexistent_length(self, sample_vocabulary_output: str) -> None:
"""Test parsing output for non-existent length."""
excerpt, excerpt_words, all_vocab = parse_vocabulary_curve_output(
excerpt, excerpt_words, _all_vocab = parse_vocabulary_curve_output(
sample_vocabulary_output, 100
)
assert excerpt == ""
@ -121,7 +123,7 @@ hello;1
world;2
VOCAB_DUMP_END
"""
excerpt, excerpt_words, all_vocab = parse_vocabulary_curve_output(output, 2)
_excerpt, _excerpt_words, all_vocab = parse_vocabulary_curve_output(output, 2)
assert all_vocab == [("hello", 1), ("world", 2)]
@ -168,10 +170,12 @@ class TestGenerateAnkiDeck:
MagicMock(success=True, source_word="hello", translated_word="hola")
]
result = generate_anki_deck(
[("hello", 1)],
source_lang="en",
target_lang="es",
deck_name="TestDeck",
DeckInput(
words_with_ranks=[("hello", 1)],
source_lang="en",
target_lang="es",
deck_name="TestDeck",
),
)
assert "#separator:semicolon" in result
@ -188,9 +192,11 @@ class TestGenerateAnkiDeck:
MagicMock(success=True, source_word="world", translated_word="mundo"),
]
result = generate_anki_deck(
[("hello", 1), ("world", 2)],
source_lang="en",
target_lang="es",
DeckInput(
words_with_ranks=[("hello", 1), ("world", 2)],
source_lang="en",
target_lang="es",
),
)
# Check that words and translations are present
@ -208,9 +214,11 @@ class TestGenerateAnkiDeck:
MagicMock(success=True, source_word="test", translated_word="prueba")
]
result = generate_anki_deck(
[("test", 42)],
source_lang="en",
target_lang="es",
DeckInput(
words_with_ranks=[("test", 42)],
source_lang="en",
target_lang="es",
),
)
assert "#42" in result
@ -226,9 +234,11 @@ class TestGenerateAnkiDeck:
)
]
result = generate_anki_deck(
[("test;word", 1)],
source_lang="en",
target_lang="es",
DeckInput(
words_with_ranks=[("test;word", 1)],
source_lang="en",
target_lang="es",
),
)
# Semicolons should be replaced with commas
@ -244,10 +254,12 @@ class TestGenerateAnkiDeck:
]
contexts = {"hello": "...say hello to..."}
result = generate_anki_deck(
[("hello", 1)],
source_lang="en",
target_lang="es",
contexts=contexts,
DeckInput(
words_with_ranks=[("hello", 1)],
source_lang="en",
target_lang="es",
contexts=contexts,
),
include_context=True,
)
@ -257,9 +269,11 @@ class TestGenerateAnkiDeck:
def test_no_translate_flag(self) -> None:
"""Test that no_translate skips translation."""
result = generate_anki_deck(
[("hello", 1), ("world", 2)],
source_lang="en",
target_lang="es",
DeckInput(
words_with_ranks=[("hello", 1), ("world", 2)],
source_lang="en",
target_lang="es",
),
no_translate=True,
)
@ -280,7 +294,7 @@ class TestMain:
result = main(["--file", "nonexistent.txt", "--length", "10"])
assert result == 1
def test_help_flag(self, capsys: pytest.CaptureFixture[str]) -> None:
def test_help_flag(self) -> None:
"""Test that --help works."""
with pytest.raises(SystemExit) as exc_info:
main(["--help"])
@ -309,7 +323,7 @@ class TestIntegration:
) as mock_translate:
# Mock translation to avoid network calls
def mock_translate_fn(
words: list[str], from_lang: str, to_lang: str
words: list[str], _from_lang: str, _to_lang: str
) -> list[MagicMock]:
return [
MagicMock(success=True, source_word=w, translated_word=f"[{w}]")
@ -324,6 +338,8 @@ class TestIntegration:
str(sample_text_file),
"--length",
"5",
"--from",
"en",
"--output",
str(output_file),
"--quiet",
@ -337,9 +353,11 @@ class TestIntegration:
assert "#separator:semicolon" in content
def test_cli_with_sample_file(
self, sample_text_file: Path, tmp_path: Path, capsys: pytest.CaptureFixture[str]
self, sample_text_file: Path, tmp_path: Path, caplog: pytest.LogCaptureFixture
) -> None:
"""Test CLI with actual file."""
import logging
from python_pkg.word_frequency.anki_generator import C_EXECUTABLE
if not C_EXECUTABLE.exists():
@ -347,9 +365,12 @@ class TestIntegration:
output_file = tmp_path / "anki_output.txt"
with patch(
"python_pkg.word_frequency.anki_generator.translate_words_batch"
) as mock_translate:
with (
caplog.at_level(logging.INFO),
patch(
"python_pkg.word_frequency.anki_generator.translate_words_batch"
) as mock_translate,
):
mock_translate.return_value = [
MagicMock(success=True, source_word="the", translated_word="le")
]
@ -360,14 +381,15 @@ class TestIntegration:
str(sample_text_file),
"--length",
"1",
"--from",
"en",
"--output",
str(output_file),
]
)
assert result == 0
captured = capsys.readouterr()
assert "FLASHCARD GENERATION COMPLETE" in captured.out
assert "FLASHCARD GENERATION COMPLETE" in caplog.text
if __name__ == "__main__":

View File

@ -2,13 +2,18 @@
from __future__ import annotations
from pathlib import Path
import logging
import time
from typing import TYPE_CHECKING
import pytest
if TYPE_CHECKING:
from pathlib import Path
from python_pkg.word_frequency.excerpt_finder import (
ExcerptResult,
ExcerptSearchOptions,
find_best_excerpt,
find_best_excerpt_with_context,
format_excerpt_results,
@ -146,7 +151,8 @@ class TestFindBestExcerptWithContext:
"""Test with zero context (should behave like find_best_excerpt)."""
text = "a b c d e f g"
result = find_best_excerpt_with_context(
text, ["c"], excerpt_length=1, context_words=0
text, ["c"], excerpt_length=1,
options=ExcerptSearchOptions(context_words=0),
)
assert result[0].excerpt == "c"
@ -155,7 +161,8 @@ class TestFindBestExcerptWithContext:
"""Test with context words."""
text = "a b c d e f g"
result = find_best_excerpt_with_context(
text, ["d"], excerpt_length=1, context_words=2
text, ["d"], excerpt_length=1,
options=ExcerptSearchOptions(context_words=2),
)
# "d" at index 3, with context should include 2 words before and after
@ -167,7 +174,8 @@ class TestFindBestExcerptWithContext:
"""Test context doesn't go before start of text."""
text = "a b c d e"
result = find_best_excerpt_with_context(
text, ["a"], excerpt_length=1, context_words=3
text, ["a"], excerpt_length=1,
options=ExcerptSearchOptions(context_words=3),
)
# Can't go before "a", so just get words after
@ -178,7 +186,8 @@ class TestFindBestExcerptWithContext:
"""Test context doesn't go beyond end of text."""
text = "a b c d e"
result = find_best_excerpt_with_context(
text, ["e"], excerpt_length=1, context_words=3
text, ["e"], excerpt_length=1,
options=ExcerptSearchOptions(context_words=3),
)
# Can't go beyond "e"
@ -240,33 +249,33 @@ class TestFormatExcerptResults:
class TestMain:
"""Tests for main CLI function."""
def test_text_and_words_input(self, capsys: pytest.CaptureFixture[str]) -> None:
def test_text_and_words_input(self, caplog: pytest.LogCaptureFixture) -> None:
"""Test --text and --words options."""
exit_code = main(
["--text", "hello world hello", "--words", "hello", "--length", "2"]
)
captured = capsys.readouterr()
with caplog.at_level(logging.INFO):
exit_code = main(
["--text", "hello world hello", "--words", "hello", "--length", "2"]
)
assert exit_code == 0
assert "hello" in captured.out
assert "hello" in caplog.text
def test_file_input(
self, tmp_path: Path, capsys: pytest.CaptureFixture[str]
self, tmp_path: Path, caplog: pytest.LogCaptureFixture
) -> None:
"""Test --file input option."""
test_file = tmp_path / "test.txt"
test_file.write_text("hello world hello world", encoding="utf-8")
exit_code = main(
["--file", str(test_file), "--words", "hello", "--length", "2"]
)
captured = capsys.readouterr()
with caplog.at_level(logging.INFO):
exit_code = main(
["--file", str(test_file), "--words", "hello", "--length", "2"]
)
assert exit_code == 0
assert "hello" in captured.out
assert "hello" in caplog.text
def test_words_file_input(
self, tmp_path: Path, capsys: pytest.CaptureFixture[str]
self, tmp_path: Path, caplog: pytest.LogCaptureFixture
) -> None:
"""Test --words-file option."""
text_file = tmp_path / "text.txt"
@ -274,91 +283,91 @@ class TestMain:
text_file.write_text("hello world hello world", encoding="utf-8")
words_file.write_text("hello\nworld\n", encoding="utf-8")
exit_code = main(
[
"--file",
str(text_file),
"--words-file",
str(words_file),
"--length",
"2",
]
)
captured = capsys.readouterr()
with caplog.at_level(logging.INFO):
exit_code = main(
[
"--file",
str(text_file),
"--words-file",
str(words_file),
"--length",
"2",
]
)
assert exit_code == 0
assert "100.00%" in captured.out # Both words match
assert "100.00%" in caplog.text # Both words match
def test_top_option(self, capsys: pytest.CaptureFixture[str]) -> None:
def test_top_option(self, caplog: pytest.LogCaptureFixture) -> None:
"""Test --top option."""
exit_code = main(
[
"--text",
"a b c d e f",
"--words",
"a",
"b",
"--length",
"2",
"--top",
"3",
]
)
captured = capsys.readouterr()
with caplog.at_level(logging.INFO):
exit_code = main(
[
"--text",
"a b c d e f",
"--words",
"a",
"b",
"--length",
"2",
"--top",
"3",
]
)
assert exit_code == 0
# Should show multiple results
assert "Result #1" in captured.out
assert "Result #1" in caplog.text
def test_context_option(self, capsys: pytest.CaptureFixture[str]) -> None:
def test_context_option(self, caplog: pytest.LogCaptureFixture) -> None:
"""Test --context option."""
exit_code = main(
[
"--text",
"a b c d e f g",
"--words",
"d",
"--length",
"1",
"--context",
"2",
]
)
capsys.readouterr()
with caplog.at_level(logging.INFO):
exit_code = main(
[
"--text",
"a b c d e f g",
"--words",
"d",
"--length",
"1",
"--context",
"2",
]
)
assert exit_code == 0
# Excerpt should include context words
def test_case_sensitive_option(self, capsys: pytest.CaptureFixture[str]) -> None:
def test_case_sensitive_option(self, caplog: pytest.LogCaptureFixture) -> None:
"""Test --case-sensitive option."""
exit_code = main(
[
"--text",
"Hello HELLO hello",
"--words",
"hello",
"--length",
"1",
"--case-sensitive",
]
)
capsys.readouterr()
with caplog.at_level(logging.INFO):
exit_code = main(
[
"--text",
"Hello HELLO hello",
"--words",
"hello",
"--length",
"1",
"--case-sensitive",
]
)
assert exit_code == 0
# Only lowercase "hello" should match
def test_file_not_found(self, capsys: pytest.CaptureFixture[str]) -> None:
def test_file_not_found(self, caplog: pytest.LogCaptureFixture) -> None:
"""Test error handling for missing file."""
exit_code = main(
["--file", "/nonexistent/file.txt", "--words", "hello", "--length", "2"]
)
captured = capsys.readouterr()
with caplog.at_level(logging.ERROR):
exit_code = main(
["--file", "/nonexistent/file.txt", "--words", "hello", "--length", "2"]
)
assert exit_code == 1
assert "Error" in captured.err
assert "Error" in caplog.text
def test_empty_words_file(
self, tmp_path: Path, capsys: pytest.CaptureFixture[str]
self, tmp_path: Path, caplog: pytest.LogCaptureFixture
) -> None:
"""Test error when words file is empty."""
text_file = tmp_path / "text.txt"
@ -366,20 +375,20 @@ class TestMain:
text_file.write_text("hello world", encoding="utf-8")
words_file.write_text("", encoding="utf-8")
exit_code = main(
[
"--file",
str(text_file),
"--words-file",
str(words_file),
"--length",
"2",
]
)
captured = capsys.readouterr()
with caplog.at_level(logging.ERROR):
exit_code = main(
[
"--file",
str(text_file),
"--words-file",
str(words_file),
"--length",
"2",
]
)
assert exit_code == 1
assert "No target words" in captured.err
assert "No target words" in caplog.text
class TestPerformance:

View File

@ -2,16 +2,20 @@
from __future__ import annotations
from pathlib import Path
import logging
import time
from typing import TYPE_CHECKING
from unittest.mock import MagicMock, patch
import pytest
if TYPE_CHECKING:
from pathlib import Path
import python_pkg.word_frequency.learning_pipe as learning_pipe_module
from python_pkg.word_frequency.learning_pipe import (
DEFAULT_STOPWORDS_EN,
LessonConfig,
generate_learning_lesson,
load_stopwords,
main,
@ -23,7 +27,7 @@ if TYPE_CHECKING:
@pytest.fixture
def mock_translation() -> Generator[MagicMock, None, None]:
def _mock_translation() -> Generator[MagicMock, None, None]:
"""Mock translation to avoid requiring argostranslate."""
def fake_batch_translate(
@ -31,7 +35,7 @@ def mock_translation() -> Generator[MagicMock, None, None]:
from_lang: str,
to_lang: str,
*,
use_cache: bool = True,
_use_cache: bool = True,
) -> list[TranslationResult]:
"""Fake batch translation that returns word with prefix."""
return [
@ -95,7 +99,7 @@ class TestGenerateLearningLesson:
"""Test basic lesson generation."""
text = "hello world hello hello world test test test test"
result = generate_learning_lesson(
text, batch_size=3, num_batches=1, skip_default_stopwords=True
text, LessonConfig(batch_size=3, num_batches=1, skip_default_stopwords=True)
)
assert "LANGUAGE LEARNING LESSON" in result
@ -106,7 +110,7 @@ class TestGenerateLearningLesson:
"""Test generation with multiple batches."""
text = " ".join(f"word{i}" * (100 - i) for i in range(20))
result = generate_learning_lesson(
text, batch_size=5, num_batches=3, skip_default_stopwords=True
text, LessonConfig(batch_size=5, num_batches=3, skip_default_stopwords=True)
)
assert "BATCH 1" in result
@ -116,7 +120,9 @@ class TestGenerateLearningLesson:
def test_stopwords_filtering(self) -> None:
"""Test that default stopwords are filtered."""
text = "the the the hello world"
result = generate_learning_lesson(text, batch_size=5, num_batches=1)
result = generate_learning_lesson(
text, LessonConfig(batch_size=5, num_batches=1)
)
# "the" should be filtered, "hello" and "world" should appear
lines = result.split("\n")
@ -139,7 +145,7 @@ class TestGenerateLearningLesson:
"""Test disabling default stopword filtering."""
text = "the the the hello"
result = generate_learning_lesson(
text, batch_size=5, num_batches=1, skip_default_stopwords=True
text, LessonConfig(batch_size=5, num_batches=1, skip_default_stopwords=True)
)
assert "the" in result.lower()
@ -148,7 +154,7 @@ class TestGenerateLearningLesson:
"""Test that numbers are filtered by default."""
text = "123 123 123 hello world"
result = generate_learning_lesson(
text, batch_size=5, num_batches=1, skip_default_stopwords=True
text, LessonConfig(batch_size=5, num_batches=1, skip_default_stopwords=True)
)
# Check vocabulary section doesn't include "123"
@ -162,10 +168,12 @@ class TestGenerateLearningLesson:
text = "123 123 123 hello"
result = generate_learning_lesson(
text,
batch_size=5,
num_batches=1,
skip_default_stopwords=True,
skip_numbers=False,
LessonConfig(
batch_size=5,
num_batches=1,
skip_default_stopwords=True,
skip_numbers=False,
),
)
assert "123" in result
@ -174,7 +182,7 @@ class TestGenerateLearningLesson:
"""Test that coverage percentage is calculated."""
text = "hello hello hello world world test"
result = generate_learning_lesson(
text, batch_size=3, num_batches=1, skip_default_stopwords=True
text, LessonConfig(batch_size=3, num_batches=1, skip_default_stopwords=True)
)
assert "recognize" in result.lower()
@ -185,11 +193,13 @@ class TestGenerateLearningLesson:
text = "hello world hello world hello world test test test"
result = generate_learning_lesson(
text,
batch_size=2,
num_batches=1,
excerpt_length=3,
excerpts_per_batch=2,
skip_default_stopwords=True,
LessonConfig(
batch_size=2,
num_batches=1,
excerpt_length=3,
excerpts_per_batch=2,
skip_default_stopwords=True,
),
)
assert "PRACTICE EXCERPTS" in result
@ -200,45 +210,45 @@ class TestMain:
"""Tests for main CLI function."""
def test_basic_text_input(
self, capsys: pytest.CaptureFixture[str], mock_translation: None
self, caplog: pytest.LogCaptureFixture, _mock_translation: None
) -> None:
"""Test with text input."""
exit_code = main(
[
"--text",
"hello world hello world test test test",
"--batch-size",
"3",
"--no-default-stopwords",
]
)
captured = capsys.readouterr()
with caplog.at_level(logging.INFO):
exit_code = main(
[
"--text",
"hello world hello world test test test",
"--batch-size",
"3",
"--no-default-stopwords",
]
)
assert exit_code == 0
assert "LANGUAGE LEARNING LESSON" in captured.out
assert "LANGUAGE LEARNING LESSON" in caplog.text
def test_file_input(
self, tmp_path: Path, capsys: pytest.CaptureFixture[str], mock_translation: None
self, tmp_path: Path, caplog: pytest.LogCaptureFixture, _mock_translation: None
) -> None:
"""Test with file input."""
test_file = tmp_path / "test.txt"
test_file.write_text("hello world hello world test", encoding="utf-8")
exit_code = main(
[
"--file",
str(test_file),
"--batch-size",
"3",
"--no-default-stopwords",
]
)
captured = capsys.readouterr()
with caplog.at_level(logging.INFO):
exit_code = main(
[
"--file",
str(test_file),
"--batch-size",
"3",
"--no-default-stopwords",
]
)
assert exit_code == 0
assert "hello" in captured.out.lower()
assert "hello" in caplog.text.lower()
def test_output_to_file(self, tmp_path: Path, mock_translation: None) -> None:
def test_output_to_file(self, tmp_path: Path, _mock_translation: None) -> None:
"""Test outputting to file."""
output_file = tmp_path / "lesson.txt"
@ -258,7 +268,7 @@ class TestMain:
assert "LANGUAGE LEARNING LESSON" in content
def test_custom_stopwords(
self, tmp_path: Path, capsys: pytest.CaptureFixture[str], mock_translation: None
self, tmp_path: Path, _mock_translation: None
) -> None:
"""Test with custom stopwords file."""
stopwords_file = tmp_path / "stop.txt"
@ -275,41 +285,40 @@ class TestMain:
"5",
]
)
capsys.readouterr()
assert exit_code == 0
# "hello" should be filtered by custom stopwords
def test_multiple_batches_option(
self, capsys: pytest.CaptureFixture[str], mock_translation: None
self, caplog: pytest.LogCaptureFixture, _mock_translation: None
) -> None:
"""Test --batches option."""
text = " ".join(f"word{i}" * (50 - i) for i in range(30))
exit_code = main(
[
"--text",
text,
"--batch-size",
"5",
"--batches",
"3",
"--no-default-stopwords",
]
)
captured = capsys.readouterr()
with caplog.at_level(logging.INFO):
exit_code = main(
[
"--text",
text,
"--batch-size",
"5",
"--batches",
"3",
"--no-default-stopwords",
]
)
assert exit_code == 0
assert "BATCH 1" in captured.out
assert "BATCH 2" in captured.out
assert "BATCH 3" in captured.out
assert "BATCH 1" in caplog.text
assert "BATCH 2" in caplog.text
assert "BATCH 3" in caplog.text
def test_file_not_found(self, capsys: pytest.CaptureFixture[str]) -> None:
def test_file_not_found(self, caplog: pytest.LogCaptureFixture) -> None:
"""Test error handling for missing file."""
exit_code = main(["--file", "/nonexistent/file.txt"])
captured = capsys.readouterr()
with caplog.at_level(logging.ERROR):
exit_code = main(["--file", "/nonexistent/file.txt"])
assert exit_code == 1
assert "Error" in captured.err
assert "Error" in caplog.text
class TestPerformance:
@ -324,10 +333,12 @@ class TestPerformance:
start_time = time.perf_counter()
result = generate_learning_lesson(
large_text,
batch_size=50,
num_batches=5,
excerpt_length=30,
skip_default_stopwords=True,
LessonConfig(
batch_size=50,
num_batches=5,
excerpt_length=30,
skip_default_stopwords=True,
),
)
elapsed = time.perf_counter() - start_time
@ -358,9 +369,11 @@ class TestTranslationIntegration:
text = "hello world hello world hello"
result = generate_learning_lesson(
text,
batch_size=5,
num_batches=1,
skip_default_stopwords=True,
LessonConfig(
batch_size=5,
num_batches=1,
skip_default_stopwords=True,
),
)
assert "hello" in result
@ -368,17 +381,19 @@ class TestTranslationIntegration:
# Should not have translation arrows
assert " -> " not in result or "Translation" not in result
def test_lesson_with_translation_params(self, mock_translation: None) -> None:
def test_lesson_with_translation_params(self, _mock_translation: None) -> None:
"""Test that translation params are accepted."""
text = "hello world hello world hello"
# This should work with mocked translation
result = generate_learning_lesson(
text,
batch_size=5,
num_batches=1,
skip_default_stopwords=True,
translate_from="en",
translate_to="es",
LessonConfig(
batch_size=5,
num_batches=1,
skip_default_stopwords=True,
translate_from="en",
translate_to="es",
),
)
# The lesson should still be generated
@ -386,7 +401,7 @@ class TestTranslationIntegration:
assert "hello" in result
def test_main_with_translate_flags(
self, tmp_path: Path, mock_translation: None
self, tmp_path: Path, _mock_translation: None
) -> None:
"""Test that main accepts translation flags."""
text_file = tmp_path / "test.txt"
@ -408,36 +423,42 @@ class TestTranslationIntegration:
assert result == 0
def test_translate_to_defaults_to_english(
self, capsys: pytest.CaptureFixture[str], mock_translation: None
self, _mock_translation: None
) -> None:
"""Test that translate_to defaults to 'en' when using auto-detection."""
text = "hello world"
# When using --translate flag (translate_from="auto"), translate_to defaults to "en"
result = generate_learning_lesson(
text,
batch_size=5,
num_batches=1,
skip_default_stopwords=True,
translate_from="auto", # Auto-detect source language
translate_to=None, # Should default to English
)
# When using --translate flag (translate_from="auto"),
# translate_to defaults to "en"
with patch.object(
learning_pipe_module, "detect_language", return_value="es"
):
result = generate_learning_lesson(
text,
LessonConfig(
batch_size=5,
num_batches=1,
skip_default_stopwords=True,
translate_from="auto", # Auto-detect source language
translate_to=None, # Should default to English
),
)
# Should have translation output with auto-detected source -> en
assert "Detected language:" in result
assert " -> en" in result
def test_no_translation_when_both_none(
self, capsys: pytest.CaptureFixture[str]
) -> None:
"""Test no translation happens when both translate_from and translate_to are None."""
def test_no_translation_when_both_none(self) -> None:
"""Test no translation when both translate params are None."""
text = "hello world"
result = generate_learning_lesson(
text,
batch_size=5,
num_batches=1,
skip_default_stopwords=True,
translate_from=None,
translate_to=None,
LessonConfig(
batch_size=5,
num_batches=1,
skip_default_stopwords=True,
translate_from=None,
translate_to=None,
),
)
# Should not have translation output

View File

@ -61,19 +61,16 @@ class ArgosAvailableMock:
self.mock_translate_module = MagicMock()
self.mock_package_module = MagicMock()
self.mock_parent = MagicMock()
self.original_available = translator._argos_available
self._sys_modules_patcher: MagicMock | None = None
self._ensure_patcher: MagicMock | None = None
self._lang_patcher: MagicMock | None = None
self._check_argos_patcher: MagicMock | None = None
self._argos_module_patcher: MagicMock | None = None
def __enter__(self) -> MagicMock:
"""Set up the mocks."""
translator._argos_available = True
# Set up translate return value
if isinstance(self.translate_returns, Exception) or isinstance(
self.translate_returns, list
):
if isinstance(self.translate_returns, (Exception, list)):
self.mock_translate_fn.side_effect = self.translate_returns
elif self.translate_returns is not None:
self.mock_translate_fn.return_value = self.translate_returns
@ -96,41 +93,52 @@ class ArgosAvailableMock:
},
)
# Patch the module-level argostranslate reference in translator
self._argos_module_patcher = patch.object(
translator, "argostranslate", self.mock_parent, create=True
)
# Patch _ensure_argos_installed and _ensure_language_pair to no-op
self._ensure_patcher = patch.object(
translator, "_ensure_argos_installed", lambda: None
)
self._lang_patcher = patch.object(
translator, "_ensure_language_pair", lambda f, t: None
translator, "_ensure_language_pair", lambda _f, _t: None
)
self._check_argos_patcher = patch.object(
translator, "_check_argos", return_value=True
)
self._sys_modules_patcher.start() # type: ignore[union-attr]
self._argos_module_patcher.start() # type: ignore[union-attr]
self._ensure_patcher.start() # type: ignore[union-attr]
self._lang_patcher.start() # type: ignore[union-attr]
self._check_argos_patcher.start() # type: ignore[union-attr]
return self.mock_translate_fn
def __exit__(self, *args: object) -> None:
"""Restore original state."""
if self._check_argos_patcher:
self._check_argos_patcher.stop()
if self._lang_patcher:
self._lang_patcher.stop()
if self._ensure_patcher:
self._ensure_patcher.stop()
if self._argos_module_patcher:
self._argos_module_patcher.stop()
if self._sys_modules_patcher:
self._sys_modules_patcher.stop()
translator._argos_available = self.original_available
# Fixtures
@pytest.fixture
def mock_argos_unavailable() -> Generator[None, None, None]:
def _mock_argos_unavailable() -> Generator[None, None, None]:
"""Mock argostranslate being unavailable (for legacy tests)."""
original_value = translator._argos_available
translator._argos_available = False
yield
translator._argos_available = original_value
with patch.object(translator, "_check_argos", return_value=False):
yield
@pytest.fixture
@ -178,7 +186,7 @@ class TestTranslationResult:
def test_result_is_tuple(self) -> None:
"""Test that TranslationResult is a namedtuple."""
result = TranslationResult("a", "b", "en", "es", True)
result = TranslationResult("a", "b", "en", "es", success=True)
assert isinstance(result, tuple)
assert len(result) == 6
@ -192,13 +200,15 @@ class TestTranslateWord:
def test_translate_word_argos_unavailable_raises(self) -> None:
"""Test that translation raises ImportError when argos is unavailable."""
# Mock _ensure_argos_installed to raise ImportError
with patch.object(
translator,
"_ensure_argos_installed",
side_effect=ImportError("argostranslate not available"),
with (
patch.object(
translator,
"_ensure_argos_installed",
side_effect=ImportError("argostranslate not available"),
),
pytest.raises(ImportError, match="argostranslate not available"),
):
with pytest.raises(ImportError, match="argostranslate not available"):
translate_word("hello", "en", "es", use_cache=False)
translate_word("hello", "en", "es", use_cache=False)
def test_translate_word_success(self) -> None:
"""Test successful word translation."""
@ -243,13 +253,15 @@ class TestTranslateWords:
def test_translate_words_argos_unavailable_raises(self) -> None:
"""Test that translating words raises ImportError when argos unavailable."""
with patch.object(
translator,
"_ensure_argos_installed",
side_effect=ImportError("argostranslate not available"),
with (
patch.object(
translator,
"_ensure_argos_installed",
side_effect=ImportError("argostranslate not available"),
),
pytest.raises(ImportError, match="argostranslate not available"),
):
with pytest.raises(ImportError, match="argostranslate not available"):
translate_words(["hello", "world"], "en", "es", use_cache=False)
translate_words(["hello", "world"], "en", "es", use_cache=False)
# translate_words_batch tests
@ -290,7 +302,7 @@ class TestTranslateWordsBatch:
assert results[4].translated_word == "cinco"
def test_batch_fallback_on_mismatch(self) -> None:
"""Test batch translation falls back to individual when result count mismatches."""
"""Test batch falls back to individual on result count mismatch."""
words = ["one", "two", "three", "four"]
# First call (batch) returns wrong count, subsequent calls are individual
with ArgosAvailableMock(["wrong", "uno", "dos", "tres", "cuatro"]) as mock:
@ -313,10 +325,11 @@ class TestTranslateWordsBatch:
mock_parent.translate = mock_translate_module
mock_parent.package = mock_package_module
original = translator._argos_available
translator._argos_available = True
with (
patch.object(translator, "_check_argos", return_value=True),
patch.object(
translator, "argostranslate", mock_parent, create=True
),
patch.dict(
"sys.modules",
{
@ -326,22 +339,22 @@ class TestTranslateWordsBatch:
},
),
patch.object(translator, "_ensure_argos_installed", lambda: None),
patch.object(translator, "_ensure_language_pair", lambda f, t: None),
patch.object(translator, "_ensure_language_pair", lambda _f, _t: None),
pytest.raises(RuntimeError, match="Translation failed"),
):
translate_words_batch(words, "en", "es", use_cache=False)
translator._argos_available = original
def test_batch_argos_unavailable_raises(self) -> None:
"""Test that batch translation raises ImportError when argos unavailable."""
with patch.object(
translator,
"_ensure_argos_installed",
side_effect=ImportError("argostranslate not available"),
with (
patch.object(
translator,
"_ensure_argos_installed",
side_effect=ImportError("argostranslate not available"),
),
pytest.raises(ImportError, match="argostranslate not available"),
):
with pytest.raises(ImportError, match="argostranslate not available"):
translate_words_batch(["hello", "world"], "en", "es", use_cache=False)
translate_words_batch(["hello", "world"], "en", "es", use_cache=False)
# format_translations tests
@ -358,7 +371,7 @@ class TestFormatTranslations:
def test_format_single_translation(self) -> None:
"""Test formatting single translation."""
results = [
TranslationResult("hello", "hola", "en", "es", True),
TranslationResult("hello", "hola", "en", "es", success=True),
]
output = format_translations(results)
@ -369,8 +382,8 @@ class TestFormatTranslations:
def test_format_multiple_translations(self) -> None:
"""Test formatting multiple translations."""
results = [
TranslationResult("hello", "hola", "en", "es", True),
TranslationResult("world", "mundo", "en", "es", True),
TranslationResult("hello", "hola", "en", "es", success=True),
TranslationResult("world", "mundo", "en", "es", success=True),
]
output = format_translations(results)
@ -382,8 +395,10 @@ class TestFormatTranslations:
def test_format_with_errors(self) -> None:
"""Test formatting with failed translations."""
results = [
TranslationResult("hello", "hola", "en", "es", True),
TranslationResult("xyz", "", "en", "es", False, "Unknown word"),
TranslationResult("hello", "hola", "en", "es", success=True),
TranslationResult(
"xyz", "", "en", "es", success=False, error="Unknown word"
),
]
output = format_translations(results, show_errors=True)
@ -393,8 +408,10 @@ class TestFormatTranslations:
def test_format_hide_errors(self) -> None:
"""Test formatting with errors hidden."""
results = [
TranslationResult("hello", "hola", "en", "es", True),
TranslationResult("xyz", "", "en", "es", False, "Unknown word"),
TranslationResult("hello", "hola", "en", "es", success=True),
TranslationResult(
"xyz", "", "en", "es", success=False, error="Unknown word"
),
]
output = format_translations(results, show_errors=False)
@ -408,7 +425,7 @@ class TestFormatTranslations:
class TestGetInstalledLanguages:
"""Tests for get_installed_languages function."""
def test_argos_unavailable(self, mock_argos_unavailable: None) -> None:
def test_argos_unavailable(self, _mock_argos_unavailable: None) -> None:
"""Test when argos is unavailable."""
result = get_installed_languages()
assert result == []
@ -433,21 +450,22 @@ class TestGetInstalledLanguages:
mock_parent.translate = mock_translate_module
mock_parent.package = mock_package_module
original = translator._argos_available
translator._argos_available = True
with patch.dict(
"sys.modules",
{
"argostranslate": mock_parent,
"argostranslate.translate": mock_translate_module,
"argostranslate.package": mock_package_module,
},
with (
patch.object(translator, "_check_argos", return_value=True),
patch.object(
translator, "argostranslate", mock_parent, create=True
),
patch.dict(
"sys.modules",
{
"argostranslate": mock_parent,
"argostranslate.translate": mock_translate_module,
"argostranslate.package": mock_package_module,
},
),
):
result = get_installed_languages()
translator._argos_available = original
assert ("en", "English") in result
assert ("es", "Spanish") in result
@ -458,7 +476,7 @@ class TestGetInstalledLanguages:
class TestGetAvailablePackages:
"""Tests for get_available_packages function."""
def test_argos_unavailable(self, mock_argos_unavailable: None) -> None:
def test_argos_unavailable(self, _mock_argos_unavailable: None) -> None:
"""Test when argos is unavailable."""
result = get_available_packages()
assert result == []
@ -470,7 +488,7 @@ class TestGetAvailablePackages:
class TestDownloadLanguages:
"""Tests for download_languages function."""
def test_argos_unavailable(self, mock_argos_unavailable: None) -> None:
def test_argos_unavailable(self, _mock_argos_unavailable: None) -> None:
"""Test when argos is unavailable."""
result = download_languages(["en", "es"])
assert result == {}
@ -503,7 +521,7 @@ class TestReadFile:
class TestMain:
"""Tests for main CLI function."""
def test_argos_unavailable_error(self, mock_argos_unavailable: None) -> None:
def test_argos_unavailable_error(self, _mock_argos_unavailable: None) -> None:
"""Test error when argos not installed."""
result = main(["--text", "hello", "--from", "en", "--to", "es"])
assert result == 1
@ -517,21 +535,22 @@ class TestMain:
mock_parent.translate = mock_translate_module
mock_parent.package = mock_package_module
original = translator._argos_available
translator._argos_available = True
with patch.dict(
"sys.modules",
{
"argostranslate": mock_parent,
"argostranslate.translate": mock_translate_module,
"argostranslate.package": mock_package_module,
},
with (
patch.object(translator, "_check_argos", return_value=True),
patch.object(
translator, "argostranslate", mock_parent, create=True
),
patch.dict(
"sys.modules",
{
"argostranslate": mock_parent,
"argostranslate.translate": mock_translate_module,
"argostranslate.package": mock_package_module,
},
),
):
result = main(["--list-languages"])
translator._argos_available = original
assert result == 0
captured = capsys.readouterr()
assert "No languages installed" in captured.out
@ -551,21 +570,22 @@ class TestMain:
mock_parent.translate = mock_translate_module
mock_parent.package = mock_package_module
original = translator._argos_available
translator._argos_available = True
with patch.dict(
"sys.modules",
{
"argostranslate": mock_parent,
"argostranslate.translate": mock_translate_module,
"argostranslate.package": mock_package_module,
},
with (
patch.object(translator, "_check_argos", return_value=True),
patch.object(
translator, "argostranslate", mock_parent, create=True
),
patch.dict(
"sys.modules",
{
"argostranslate": mock_parent,
"argostranslate.translate": mock_translate_module,
"argostranslate.package": mock_package_module,
},
),
):
result = main(["--list-languages"])
translator._argos_available = original
assert result == 0
captured = capsys.readouterr()
assert "en" in captured.out
@ -622,7 +642,6 @@ class TestMain:
def test_translate_output_to_file(
self,
tmp_path: Path,
capsys: pytest.CaptureFixture[str],
) -> None:
"""Test outputting translations to file."""
output_file = tmp_path / "output.txt"
@ -647,7 +666,9 @@ class TestMain:
assert "hello" in content
assert "hola" in content
def test_no_input_shows_help(self, capsys: pytest.CaptureFixture[str]) -> None:
def test_no_input_shows_help(
self,
) -> None:
"""Test that no input shows help."""
with ArgosAvailableMock():
result = main([])

View File

@ -89,7 +89,7 @@ class TestExcerptValidity:
"""Tests that verify excerpts are actually found in the source text."""
def test_excerpt_exists_in_source_text(self, sample_text_file: Path) -> None:
"""Test that each excerpt can be found in the source text as contiguous words."""
"""Test that each excerpt can be found in source text."""
import re
source_text = sample_text_file.read_text(encoding="utf-8").lower()

View File

@ -1,149 +1,163 @@
#!/usr/bin/env python3
"""Translator - translates words/text between languages.
r"""Translator - translates words/text between languages.
This module provides translation capabilities using either:
1. Argos Translate (offline, requires large downloads) - preferred if installed
2. deep-translator (online, uses Google Translate) - lightweight fallback
Usage:
1. Argos Translate (offline, requires large downloads)
2. deep-translator (online, uses Google Translate)
Usage::
# Translate a single word
python -m python_pkg.word_frequency.translator --text "hello" --from en --to es
python -m python_pkg.word_frequency.translator \\
--text "hello" --from en --to es
# Translate multiple words
python -m python_pkg.word_frequency.translator --words hello world goodbye --from en --to pl
python -m python_pkg.word_frequency.translator \\
--words hello world goodbye --from en --to pl
# Translate words from a file (one word per line)
python -m python_pkg.word_frequency.translator --words-file words.txt --from la --to en
python -m python_pkg.word_frequency.translator \\
--words-file words.txt --from la --to en
# List available languages
python -m python_pkg.word_frequency.translator --list-languages
python -m python_pkg.word_frequency.translator \\
--list-languages
# Output to file
python -m python_pkg.word_frequency.translator --words-file vocab.txt --from pl --to en --output translations.txt
python -m python_pkg.word_frequency.translator \\
--words-file vocab.txt --from pl --to en \\
--output translations.txt
Dependencies (install one):
pip install deep-translator # Lightweight, uses Google Translate (online)
pip install argostranslate # Offline translation (requires ~3GB downloads)
Dependencies (install one)::
pip install deep-translator
pip install argostranslate
"""
from __future__ import annotations
import argparse
import importlib
import logging
import os
from pathlib import Path
import subprocess
import sys
from typing import TYPE_CHECKING, NamedTuple
if TYPE_CHECKING:
from collections.abc import Sequence
# Lazy imports for translation backends (may not be installed)
_argos_available: bool | None = None
_deep_translator_available: bool | None = None
_langdetect_available: bool | None = None
_gpu_initialized: bool = False
_gpu_available: bool | None = None
try:
import torch
except ImportError:
torch = None # type: ignore[assignment]
try:
import argostranslate.package
import argostranslate.translate
except ImportError:
argostranslate = None # type: ignore[assignment]
try:
from deep_translator import GoogleTranslator
except ImportError:
GoogleTranslator = None
try:
import langdetect
except ImportError:
langdetect = None # type: ignore[assignment]
try:
from python_pkg.word_frequency.cache import (
get_translation_cache,
)
except ImportError:
get_translation_cache = None
logger = logging.getLogger(__name__)
_LANG_DETECT_SAMPLE_SIZE = 5000
_BATCH_SIZE = 100
class _TranslatorState:
"""Holds module-level state for lazy-initialized backends."""
gpu_initialized: bool = False
def _check_cuda_available() -> bool:
"""Check if CUDA is available for GPU acceleration."""
global _gpu_available
if _gpu_available is None:
try:
import torch
return torch is not None and torch.cuda.is_available()
_gpu_available = torch.cuda.is_available()
except ImportError:
_gpu_available = False
return _gpu_available
def _validate_gpu_device() -> str:
"""Validate GPU device availability and return device name.
Raises:
RuntimeError: If no GPU devices are found.
"""
device_count = torch.cuda.device_count()
if device_count == 0:
msg = "CUDA reports available but no GPU devices found"
raise RuntimeError(msg)
return torch.cuda.get_device_name(0)
def _init_gpu_if_available() -> None:
"""Initialize GPU for argostranslate if CUDA is available.
Raises:
RuntimeError: If CUDA is available but GPU initialization fails.
RuntimeError: If CUDA is available but GPU init fails.
"""
global _gpu_initialized
if _gpu_initialized:
if _TranslatorState.gpu_initialized:
return
if not _check_cuda_available():
_gpu_initialized = True
_TranslatorState.gpu_initialized = True
return
import sys
print("CUDA detected, initializing GPU acceleration...", file=sys.stderr)
logger.info(
"CUDA detected, initializing GPU acceleration..."
)
try:
import torch
# Force CTranslate2 to use CUDA
device_count = torch.cuda.device_count()
if device_count == 0:
raise RuntimeError("CUDA reports available but no GPU devices found")
device_name = torch.cuda.get_device_name(0)
print(f" Using GPU: {device_name}", file=sys.stderr)
# Set environment variable to force GPU usage in argos
import os
device_name = _validate_gpu_device()
logger.info(" Using GPU: %s", device_name)
os.environ["CT2_CUDA_ALLOW_FP16"] = "1"
os.environ["CT2_USE_EXPERIMENTAL_PACKED_GEMM"] = "1"
_gpu_initialized = True
print(" GPU acceleration enabled.", file=sys.stderr)
_TranslatorState.gpu_initialized = True
logger.info(" GPU acceleration enabled.")
except Exception as e:
raise RuntimeError(
f"CUDA is available but GPU initialization failed: {e}\n"
f"This may be due to incompatible CUDA version or driver issues.\n"
f"To disable GPU and use CPU only, set environment variable: CT2_FORCE_CPU=1"
) from e
msg = (
f"CUDA is available but GPU initialization failed: "
f"{e}\nThis may be due to incompatible CUDA "
"version or driver issues.\n"
"To disable GPU and use CPU only, set "
"environment variable: CT2_FORCE_CPU=1"
)
raise RuntimeError(msg) from e
def _check_argos() -> bool:
"""Check if argostranslate is available."""
global _argos_available
if _argos_available is None:
try:
import argostranslate.package
import argostranslate.translate
_ = (argostranslate.package, argostranslate.translate)
_argos_available = True
except ImportError:
_argos_available = False
return _argos_available
return argostranslate is not None
def _check_deep_translator() -> bool:
"""Check if deep-translator is available."""
global _deep_translator_available
if _deep_translator_available is None:
try:
from deep_translator import GoogleTranslator
_ = GoogleTranslator
_deep_translator_available = True
except ImportError:
_deep_translator_available = False
return _deep_translator_available
return GoogleTranslator is not None
def _check_langdetect() -> bool:
"""Check if langdetect is available."""
global _langdetect_available
if _langdetect_available is None:
try:
import langdetect
_ = langdetect
_langdetect_available = True
except ImportError:
_langdetect_available = False
return _langdetect_available
return langdetect is not None
def detect_language(text: str) -> str | None:
@ -158,13 +172,14 @@ def detect_language(text: str) -> str | None:
if not _check_langdetect():
return None
import langdetect
try:
# Use a sample of the text for detection (faster and more reliable)
sample = text[:5000] if len(text) > 5000 else text
return langdetect.detect(sample) # type: ignore[no-any-return]
except langdetect.LangDetectException: # type: ignore[attr-defined]
sample = (
text[:_LANG_DETECT_SAMPLE_SIZE]
if len(text) > _LANG_DETECT_SAMPLE_SIZE
else text
)
return langdetect.detect(sample) # type: ignore[no-any-return,union-attr]
except langdetect.LangDetectException: # type: ignore[attr-defined,union-attr]
return None
@ -188,8 +203,6 @@ def get_installed_languages() -> list[tuple[str, str]]:
if not _check_argos():
return []
import argostranslate.translate
languages = argostranslate.translate.get_installed_languages()
return [(lang.code, lang.name) for lang in languages]
@ -203,8 +216,6 @@ def get_available_packages() -> list[tuple[str, str, str, str]]:
if not _check_argos():
return []
import argostranslate.package
argostranslate.package.update_package_index()
available = argostranslate.package.get_available_packages()
return [
@ -227,12 +238,10 @@ def download_languages(lang_codes: Sequence[str]) -> dict[str, bool]:
if not _check_argos():
return {}
import argostranslate.package
results: dict[str, bool] = {}
# Update package index
print("Updating package index...")
logger.info("Updating package index...")
argostranslate.package.update_package_index()
available = argostranslate.package.get_available_packages()
@ -255,13 +264,26 @@ def download_languages(lang_codes: Sequence[str]) -> dict[str, bool]:
if pkg_key in available_lookup:
pkg = available_lookup[pkg_key]
try:
print(f"Downloading {from_code} -> {to_code}...")
logger.info(
"Downloading %s -> %s...",
from_code,
to_code,
)
argostranslate.package.install_from_path(pkg.download())
results[key] = True
print(f" ✓ Installed {from_code} -> {to_code}")
except Exception as e: # noqa: BLE001
logger.info(
" Installed %s -> %s",
from_code,
to_code,
)
except (OSError, RuntimeError, ValueError) as e:
results[key] = False
print(f" ✗ Failed {from_code} -> {to_code}: {e}")
logger.info(
" Failed %s -> %s: %s",
from_code,
to_code,
e,
)
else:
# Package not available
results[key] = False
@ -278,32 +300,38 @@ def _ensure_argos_installed() -> None:
if _check_argos():
return
import subprocess
import sys
print("argostranslate not found. Attempting to install...")
logger.info("argostranslate not found. Attempting to install...")
try:
subprocess.run(
[sys.executable, "-m", "pip", "install", "argostranslate"],
check=True,
capture_output=True,
)
# Reset the check flag and verify
global _argos_available
_argos_available = None
if not _check_argos():
raise ImportError("argostranslate installation succeeded but import failed")
print("argostranslate installed successfully.")
# Attempt runtime re-import
importlib.import_module("argostranslate.package")
importlib.import_module("argostranslate.translate")
logger.info("argostranslate installed successfully.")
except subprocess.CalledProcessError as e:
error_msg = e.stderr.decode() if e.stderr else str(e)
raise ImportError(
f"argostranslate is required for offline translation.\n\n"
f"Install manually with one of:\n"
f" pip install argostranslate # In a virtualenv\n"
f" pipx install argostranslate # System-wide via pipx\n"
f" pacman -S python-argostranslate # Arch Linux (if available)\n\n"
msg = (
"argostranslate is required for offline "
"translation.\n\n"
"Install manually with one of:\n"
" pip install argostranslate"
" # In a virtualenv\n"
" pipx install argostranslate"
" # System-wide via pipx\n"
" pacman -S python-argostranslate"
" # Arch Linux (if available)\n\n"
f"Original error: {error_msg}"
) from e
)
raise ImportError(msg) from e
except ImportError:
msg = (
"argostranslate installation succeeded but "
"import failed"
)
raise ImportError(msg) from None
def _ensure_language_pair(from_lang: str, to_lang: str) -> None:
@ -316,11 +344,9 @@ def _ensure_language_pair(from_lang: str, to_lang: str) -> None:
Raises:
ValueError: If language pair cannot be obtained.
"""
import argostranslate.package
import argostranslate.translate
# Check if already installed
installed_languages = argostranslate.translate.get_installed_languages()
installed_languages = (
argostranslate.translate.get_installed_languages()
)
from_lang_obj = None
to_lang_obj = None
@ -337,37 +363,44 @@ def _ensure_language_pair(from_lang: str, to_lang: str) -> None:
return # Already available
# Need to download
import sys
print(
f"Downloading language pack: {from_lang} -> {to_lang}...",
file=sys.stderr,
logger.info(
"Downloading language pack: %s -> %s...",
from_lang,
to_lang,
)
print(" Fetching package index...", file=sys.stderr)
logger.info(" Fetching package index...")
argostranslate.package.update_package_index()
available = argostranslate.package.get_available_packages()
pkg = next(
(p for p in available if p.from_code == from_lang and p.to_code == to_lang),
(
p
for p in available
if p.from_code == from_lang and p.to_code == to_lang
),
None,
)
if pkg is None:
raise ValueError(
f"No language pack available for {from_lang} -> {to_lang}. "
f"Available pairs can be listed with --list-languages."
msg = (
f"No language pack available for "
f"{from_lang} -> {to_lang}. "
"Available pairs can be listed with "
"--list-languages."
)
raise ValueError(msg)
print(
" Downloading package (~50-100MB, this may take a minute)...",
file=sys.stderr,
logger.info(
" Downloading package (~50-100MB, "
"this may take a minute)...",
)
download_path = pkg.download()
print(" Installing language pack...", file=sys.stderr)
logger.info(" Installing language pack...")
argostranslate.package.install_from_path(download_path)
print(
f"Language pack {from_lang} -> {to_lang} installed.",
file=sys.stderr,
logger.info(
"Language pack %s -> %s installed.",
from_lang,
to_lang,
)
@ -393,38 +426,30 @@ def translate_word(
ImportError: If argostranslate is not available and cannot be installed.
"""
# Check cache first
if use_cache:
try:
from python_pkg.word_frequency.cache import get_translation_cache
cache = get_translation_cache()
cached = cache.get(word, from_lang, to_lang)
if cached is not None:
return TranslationResult(
source_word=word,
translated_word=cached,
source_lang=from_lang,
target_lang=to_lang,
success=True,
)
except ImportError:
pass # Cache not available
if use_cache and get_translation_cache is not None:
cache = get_translation_cache()
cached = cache.get(word, from_lang, to_lang)
if cached is not None:
return TranslationResult(
source_word=word,
translated_word=cached,
source_lang=from_lang,
target_lang=to_lang,
success=True,
)
# Ensure argos is installed (will raise if it can't be)
_ensure_argos_installed()
import argostranslate.translate
try:
translated = argostranslate.translate.translate(word, from_lang, to_lang)
translated = argostranslate.translate.translate(
word, from_lang, to_lang,
)
# Cache the result
if use_cache:
try:
from python_pkg.word_frequency.cache import get_translation_cache
get_translation_cache().set(word, from_lang, to_lang, translated)
except ImportError:
pass
if use_cache and get_translation_cache is not None:
get_translation_cache().set(
word, from_lang, to_lang, translated,
)
return TranslationResult(
source_word=word,
translated_word=translated,
@ -432,7 +457,7 @@ def translate_word(
target_lang=to_lang,
success=True,
)
except Exception as e: # noqa: BLE001
except (OSError, RuntimeError, ValueError, TypeError) as e:
return TranslationResult(
source_word=word,
translated_word="",
@ -483,8 +508,6 @@ def _translate_batch_worker(
Returns:
Tuple of (batch_idx, translations dict).
"""
import argostranslate.translate
translations: dict[str, str] = {}
# Batch translate by joining with newlines
@ -507,6 +530,78 @@ def _translate_batch_worker(
return batch_idx, translations
def _run_batch_translation(
words_to_translate: list[str],
from_lang: str,
to_lang: str,
) -> dict[str, str]:
"""Translate a list of words in batches with progress logging.
Args:
words_to_translate: Words needing translation.
from_lang: Source language code.
to_lang: Target language code.
Returns:
Dict mapping lowercased words to translations.
Raises:
RuntimeError: If translation fails.
"""
new_translations: dict[str, str] = {}
num_to_translate = len(words_to_translate)
gpu_status = (
" (GPU)" if _check_cuda_available() else " (CPU)"
)
logger.info(
"Translating %d words from %s to %s%s...",
num_to_translate,
from_lang,
to_lang,
gpu_status,
)
try:
batches = [
words_to_translate[i : i + _BATCH_SIZE]
for i in range(0, num_to_translate, _BATCH_SIZE)
]
total_batches = len(batches)
for batch_idx, batch_words in enumerate(batches):
words_done = min(
(batch_idx + 1) * _BATCH_SIZE,
num_to_translate,
)
pct = int(words_done / num_to_translate * 100)
logger.info(
" [%3d%%] Translating batch %d/%d "
"(%d/%d words)...",
pct,
batch_idx + 1,
total_batches,
words_done,
num_to_translate,
)
_, batch_translations = _translate_batch_worker(
batch_words, from_lang, to_lang, batch_idx,
)
new_translations.update(batch_translations)
logger.info(" Translation complete.")
except Exception as e:
msg = (
f"Translation failed for "
f"{from_lang} -> {to_lang}: {e}"
)
raise RuntimeError(msg) from e
return new_translations
def translate_words_batch(
words: Sequence[str],
from_lang: str,
@ -535,90 +630,36 @@ def translate_words_batch(
if not words:
return []
# Ensure argos is installed (will raise if it can't be)
_ensure_argos_installed()
# Initialize GPU if available (will raise if CUDA available but fails)
_init_gpu_if_available()
# Ensure language pair is available
_ensure_language_pair(from_lang, to_lang)
# Check cache for already-translated words
cached_results: dict[str, str] = {}
words_to_translate: list[str] = []
if use_cache:
try:
from python_pkg.word_frequency.cache import get_translation_cache
cache = get_translation_cache()
cached_results = cache.get_many(list(words), from_lang, to_lang)
except ImportError:
pass
if use_cache and get_translation_cache is not None:
cache = get_translation_cache()
cached_results = cache.get_many(
list(words), from_lang, to_lang,
)
# Find words that still need translation
for word in words:
if word.lower() not in cached_results:
words_to_translate.append(word)
words_to_translate = [
word for word in words
if word.lower() not in cached_results
]
# Translate uncached words using argos batch
new_translations: dict[str, str] = {}
if words_to_translate:
import sys
num_to_translate = len(words_to_translate)
# Check if GPU is being used
gpu_status = " (GPU)" if _gpu_available else " (CPU)"
print(
f"Translating {num_to_translate} words from {from_lang} to {to_lang}{gpu_status}...",
file=sys.stderr,
flush=True,
new_translations = _run_batch_translation(
words_to_translate, from_lang, to_lang,
)
try:
# Split into batches - larger batches are faster but show progress less often
BATCH_SIZE = 100
batches: list[list[str]] = []
for i in range(0, num_to_translate, BATCH_SIZE):
batches.append(words_to_translate[i : i + BATCH_SIZE])
total_batches = len(batches)
# Sequential translation with progress
# (argostranslate is not thread-safe - uses global model)
for batch_idx, batch_words in enumerate(batches):
words_done = (batch_idx + 1) * BATCH_SIZE
words_done = min(words_done, num_to_translate)
pct = int(words_done / num_to_translate * 100)
print(
f" [{pct:3d}%] Translating batch {batch_idx + 1}/{total_batches} "
f"({words_done}/{num_to_translate} words)...",
file=sys.stderr,
flush=True,
)
_, batch_translations = _translate_batch_worker(
batch_words, from_lang, to_lang, batch_idx
)
new_translations.update(batch_translations)
print(" Translation complete.", file=sys.stderr, flush=True)
except Exception as e:
raise RuntimeError(
f"Translation failed for {from_lang} -> {to_lang}: {e}"
) from e
# Cache new translations
if use_cache and new_translations:
try:
from python_pkg.word_frequency.cache import get_translation_cache
get_translation_cache().set_many(new_translations, from_lang, to_lang)
except ImportError:
pass
if use_cache and get_translation_cache is not None:
get_translation_cache().set_many(
new_translations, from_lang, to_lang,
)
# Merge cached and new translations
all_translations = {**cached_results, **new_translations}
@ -694,22 +735,14 @@ def read_file(filepath: str | Path) -> str:
return Path(filepath).read_text(encoding="utf-8")
def main(argv: Sequence[str] | None = None) -> int:
"""Main entry point for the translator.
Args:
argv: Command line arguments.
Returns:
Exit code.
"""
def _build_parser() -> argparse.ArgumentParser:
"""Build the argument parser for the translator CLI."""
parser = argparse.ArgumentParser(
description="Offline translator using Argos Translate.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
# Actions
action_group = parser.add_mutually_exclusive_group()
action_group.add_argument(
"--list-languages",
@ -728,10 +761,12 @@ def main(argv: Sequence[str] | None = None) -> int:
"-d",
nargs="+",
metavar="LANG",
help="Download language packs (e.g., --download en es pl)",
help=(
"Download language packs "
"(e.g., --download en es pl)"
),
)
# Input
input_group = parser.add_mutually_exclusive_group()
input_group.add_argument(
"--text",
@ -752,7 +787,6 @@ def main(argv: Sequence[str] | None = None) -> int:
help="File with words to translate (one per line)",
)
# Language options
parser.add_argument(
"--from",
"-f",
@ -769,8 +803,6 @@ def main(argv: Sequence[str] | None = None) -> int:
default="en",
help="Target language code (default: en)",
)
# Output
parser.add_argument(
"--output",
"-o",
@ -778,87 +810,142 @@ def main(argv: Sequence[str] | None = None) -> int:
help="Output file path",
)
args = parser.parse_args(argv)
return parser
# Check if argostranslate is available
if not _check_argos():
print(
"Error: argostranslate is not installed.\n"
"Install it with: pip install argostranslate",
file=sys.stderr,
def _handle_list_languages() -> int:
"""Handle --list-languages command."""
langs = get_installed_languages()
if not langs:
sys.stdout.write("No languages installed.\n")
sys.stdout.write(
"Download some with: --download en es pl de fr\n",
)
return 1
else:
sys.stdout.write("Installed languages:\n")
for code, name in sorted(langs):
sys.stdout.write(f" {code}: {name}\n")
return 0
# Handle list-languages
if args.list_languages:
langs = get_installed_languages()
if not langs:
print("No languages installed.")
print("Download some with: --download en es pl de fr")
else:
print("Installed languages:")
for code, name in sorted(langs):
print(f" {code}: {name}")
return 0
# Handle list-available
if args.list_available:
packages = get_available_packages()
if not packages:
print("No packages available (check internet connection).")
else:
print("Available language packages:")
for from_code, from_name, to_code, to_name in sorted(packages):
print(f" {from_code} ({from_name}) -> {to_code} ({to_name})")
return 0
def _handle_list_available() -> int:
"""Handle --list-available command."""
packages = get_available_packages()
if not packages:
sys.stdout.write(
"No packages available "
"(check internet connection).\n",
)
else:
sys.stdout.write("Available language packages:\n")
for from_code, from_name, to_code, to_name in sorted(
packages,
):
sys.stdout.write(
f" {from_code} ({from_name})"
f" -> {to_code} ({to_name})\n",
)
return 0
# Handle download
if args.download:
download_results = download_languages(args.download)
success_count = sum(1 for v in download_results.values() if v)
print(f"\nDownloaded {success_count}/{len(download_results)} language pairs.")
return 0 if success_count > 0 else 1
# Handle translation
words: list[str] = []
def _handle_download(lang_codes: list[str]) -> int:
"""Handle --download command."""
download_results = download_languages(lang_codes)
success_count = sum(
1 for v in download_results.values() if v
)
sys.stdout.write(
f"\nDownloaded {success_count}/"
f"{len(download_results)} language pairs.\n",
)
return 0 if success_count > 0 else 1
def _collect_words(
args: argparse.Namespace,
) -> list[str] | None:
"""Collect words from args. Returns None on error."""
if args.text:
words = [args.text]
elif args.words:
words = args.words
elif args.words_file:
return [args.text]
if args.words:
return args.words
if args.words_file:
try:
content = read_file(args.words_file)
words = [w.strip() for w in content.splitlines() if w.strip()]
except FileNotFoundError:
print(f"Error: File not found: {args.words_file}", file=sys.stderr)
return 1
sys.stderr.write(
f"Error: File not found: {args.words_file}\n",
)
return None
return [
w.strip()
for w in content.splitlines()
if w.strip()
]
return []
if not words:
parser.print_help()
return 1
# Translate
def _handle_translation(args: argparse.Namespace) -> int:
"""Handle the translation action."""
try:
results = translate_words_batch(words, args.from_lang, args.to_lang)
except ImportError as e:
print(f"Error: {e}", file=sys.stderr)
results = translate_words_batch(
args.words, args.from_lang, args.to_lang,
)
except ImportError:
logger.exception("Translation import error")
return 1
output = format_translations(results)
# Output
if args.output:
Path(args.output).write_text(output, encoding="utf-8")
print(f"Translations written to {args.output}")
sys.stdout.write(
f"Translations written to {args.output}\n",
)
else:
print(output)
sys.stdout.write(output + "\n")
# Return error if any translation failed
if any(not r.success for r in results):
return 1
return 0
def main(argv: Sequence[str] | None = None) -> int:
"""Main entry point for the translator.
Args:
argv: Command line arguments.
Returns:
Exit code.
"""
parser = _build_parser()
args = parser.parse_args(argv)
if not _check_argos():
sys.stderr.write(
"Error: argostranslate is not installed.\n"
"Install it with: pip install argostranslate\n",
)
return 1
if args.list_languages:
return _handle_list_languages()
if args.list_available:
return _handle_list_available()
if args.download:
return _handle_download(args.download)
words = _collect_words(args)
if not words:
if words is not None:
parser.print_help()
return 1
args.words = words
return _handle_translation(args)
if __name__ == "__main__":
sys.exit(main())

View File

@ -14,7 +14,9 @@ Usage:
from __future__ import annotations
import argparse
import logging
from pathlib import Path
import re
import sys
from typing import TYPE_CHECKING, NamedTuple
@ -27,6 +29,9 @@ except ImportError:
from analyzer import analyze_text, read_file
logger = logging.getLogger(__name__)
class ExcerptAnalysis(NamedTuple):
"""Analysis result for an excerpt length."""
@ -111,8 +116,6 @@ def find_optimal_excerpts(
ranked_words = [word for word, _ in word_counts.most_common()]
# Extract all words from text (preserving order)
import re
all_words = re.findall(r"\b[\w]+\b", text, re.UNICODE)
if not case_sensitive:
all_words = [w.lower() for w in all_words]
@ -150,6 +153,9 @@ def find_optimal_excerpts(
return results
_MAX_EXCERPT_DISPLAY_LEN = 50
def format_results(
results: list[ExcerptAnalysis],
*,
@ -198,7 +204,7 @@ def format_results(
if show_excerpts:
# Truncate long excerpts
excerpt = r.best_excerpt
if len(excerpt) > 50:
if len(excerpt) > _MAX_EXCERPT_DISPLAY_LEN:
excerpt = excerpt[:47] + "..."
lines.append(f"{r.excerpt_length:>6} {r.min_vocab_needed:>5} {excerpt}")
else:
@ -285,10 +291,7 @@ def main(argv: Sequence[str] | None = None) -> int:
args = parser.parse_args(argv)
try:
if args.text:
text = args.text
else:
text = read_file(args.file)
text = args.text or read_file(args.file)
results = find_optimal_excerpts(
text,
@ -304,15 +307,15 @@ def main(argv: Sequence[str] | None = None) -> int:
if args.output:
Path(args.output).write_text(output, encoding="utf-8")
print(f"Output written to {args.output}")
logger.info("Output written to %s", args.output)
else:
print(output)
logger.info("%s", output)
except FileNotFoundError as e:
print(f"Error: File not found - {e}", file=sys.stderr)
except FileNotFoundError:
logger.exception("File not found")
return 1
except UnicodeDecodeError as e:
print(f"Error: Could not decode file - {e}", file=sys.stderr)
except UnicodeDecodeError:
logger.exception("Could not decode file")
return 1
return 0