feat: automatic language detection translation and anki generator with cache

This commit is contained in:
Krzysztof Rudnicki 2025-12-29 14:41:56 +01:00
parent 1411e685c2
commit d2b6f00185
10 changed files with 3826 additions and 299 deletions

View File

@ -158,10 +158,21 @@ static void assign_ranks(void) {
/* Sort all_entries by frequency (this doesn't affect word_sequence) */
qsort(all_entries, num_unique_words, sizeof(WordEntry *), compare_by_count);
/* Assign 1-indexed ranks */
/* Assign 1-indexed ranks using competition ranking:
* Words with same frequency get same rank.
* Next rank is current_position + 1 (skipping numbers).
* Example: counts 5,3,3,2 -> ranks 1,2,2,4 (not 1,2,3,4) */
for (int i = 0; i < num_unique_words; i++) {
if (i == 0) {
all_entries[i]->rank = 1;
} else if (all_entries[i]->count == all_entries[i-1]->count) {
/* Same frequency as previous word - same rank */
all_entries[i]->rank = all_entries[i-1]->rank;
} else {
/* Different frequency - rank is position + 1 */
all_entries[i]->rank = i + 1;
}
}
}
/* Analyze excerpt and return max rank needed */
@ -306,21 +317,43 @@ static void cleanup(void) {
}
}
/* Dump all vocabulary with ranks (for Python integration) */
static void dump_vocabulary(int max_rank) {
printf("VOCAB_DUMP_START\n");
for (int i = 0; i < num_unique_words; i++) {
if (all_entries[i]->rank <= max_rank) {
printf("%s;%d\n", all_entries[i]->word, all_entries[i]->rank);
}
}
printf("VOCAB_DUMP_END\n");
}
int main(int argc, char *argv[]) {
if (argc < 2) {
fprintf(stderr, "Usage: %s <file.txt> [max_length]\n", argv[0]);
fprintf(stderr, "Usage: %s <file.txt> [max_length] [--dump-vocab [max_rank]]\n", argv[0]);
fprintf(stderr, " max_length: maximum excerpt length to analyze (default: 30)\n");
fprintf(stderr, " --dump-vocab: output all words with ranks up to max_rank\n");
return 1;
}
const char *filename = argv[1];
int max_length = 30;
bool dump_vocab = false;
int dump_max_rank = 0;
if (argc >= 3) {
max_length = atoi(argv[2]);
/* Parse arguments */
for (int i = 2; i < argc; i++) {
if (strcmp(argv[i], "--dump-vocab") == 0) {
dump_vocab = true;
if (i + 1 < argc && argv[i + 1][0] != '-') {
dump_max_rank = atoi(argv[++i]);
}
} else if (argv[i][0] != '-') {
max_length = atoi(argv[i]);
if (max_length < 1) max_length = 1;
if (max_length > 1000) max_length = 1000;
}
}
/* Initialize hash table */
memset(hash_table, 0, sizeof(hash_table));
@ -351,6 +384,17 @@ int main(int argc, char *argv[]) {
/* Print results */
print_results(results, max_length);
/* Dump vocabulary if requested */
if (dump_vocab) {
/* If no max_rank specified, use the max from the excerpt */
if (dump_max_rank == 0 && max_length > 0) {
dump_max_rank = results[max_length - 1].min_vocab_needed;
}
if (dump_max_rank > 0) {
dump_vocabulary(dump_max_rank);
}
}
/* Cleanup */
free(results);
cleanup();

Binary file not shown.

View File

@ -40,10 +40,10 @@ try:
detect_language,
translate_words_batch,
)
from python_pkg.word_frequency.analyzer import read_file, analyze_text
from python_pkg.word_frequency.analyzer import read_file
except ImportError:
from translator import detect_language, translate_words_batch
from analyzer import read_file, analyze_text
from analyzer import read_file
# Path to C vocabulary_curve executable
@ -59,12 +59,13 @@ class VocabWord(NamedTuple):
context: str
def run_vocabulary_curve(filepath: Path, max_length: int) -> str:
def run_vocabulary_curve(filepath: Path, max_length: int, *, dump_vocab: bool = False) -> str:
"""Run the C vocabulary_curve executable.
Args:
filepath: Path to the text file.
max_length: Maximum excerpt length.
dump_vocab: If True, also dump all vocabulary up to max rank needed.
Returns:
Output from the executable.
@ -79,8 +80,12 @@ def run_vocabulary_curve(filepath: Path, max_length: int) -> str:
"Please compile it first: cd C/vocabulary_curve && make"
)
cmd = [str(C_EXECUTABLE), str(filepath), str(max_length)]
if dump_vocab:
cmd.append("--dump-vocab")
result = subprocess.run(
[str(C_EXECUTABLE), str(filepath), str(max_length)],
cmd,
capture_output=True,
text=True,
timeout=120,
@ -89,7 +94,7 @@ def run_vocabulary_curve(filepath: Path, max_length: int) -> str:
return result.stdout
def parse_vocabulary_curve_output(output: str, target_length: int) -> tuple[str, list[tuple[str, int]]]:
def parse_vocabulary_curve_output(output: str, target_length: int) -> tuple[str, list[tuple[str, int]], list[tuple[str, int]]]:
"""Parse output from vocabulary_curve to get words needed.
Args:
@ -97,11 +102,14 @@ def parse_vocabulary_curve_output(output: str, target_length: int) -> tuple[str,
target_length: The target excerpt length.
Returns:
Tuple of (excerpt_text, list of (word, rank) tuples).
Tuple of (excerpt_text, excerpt_words, all_vocab_words).
excerpt_words: words in the excerpt with their ranks.
all_vocab_words: all words up to max rank (from VOCAB_DUMP if present).
"""
lines = output.split("\n")
excerpt = ""
words: list[tuple[str, int]] = []
excerpt_words: list[tuple[str, int]] = []
all_vocab: list[tuple[str, int]] = []
# Find the line for the target length
i = 0
@ -131,26 +139,28 @@ def parse_vocabulary_curve_output(output: str, target_length: int) -> tuple[str,
# Parse "word(#rank), word2(#rank2), ..."
pattern = r"(\S+)\(#(\d+)\)"
matches = re.findall(pattern, words_part)
words = [(w, int(r)) for w, r in matches]
excerpt_words = [(w, int(r)) for w, r in matches]
break
i += 1
return excerpt, words
# Parse VOCAB_DUMP section if present
in_vocab_dump = False
for line in lines:
if line.strip() == "VOCAB_DUMP_START":
in_vocab_dump = True
continue
if line.strip() == "VOCAB_DUMP_END":
break
if in_vocab_dump and ";" in line:
parts = line.strip().split(";")
if len(parts) == 2:
word, rank_str = parts
try:
all_vocab.append((word, int(rank_str)))
except ValueError:
pass
def get_top_n_words(text: str, n: int) -> list[tuple[str, int]]:
"""Get the top N most frequent words from text.
Args:
text: The source text.
n: Number of top words to return.
Returns:
List of (word, rank) tuples, ranked 1 to n.
"""
word_counts = analyze_text(text)
sorted_words = sorted(word_counts.items(), key=lambda x: (-x[1], x[0]))
return [(word, rank + 1) for rank, (word, _) in enumerate(sorted_words[:n])]
return excerpt, excerpt_words, all_vocab
def find_word_contexts(
@ -196,6 +206,8 @@ def generate_anki_deck(
deck_name: str = "Vocabulary",
include_context: bool = False,
no_translate: bool = False,
excerpt: str = "",
excerpt_words: list[tuple[str, int]] | None = None,
) -> str:
"""Generate Anki-compatible deck content.
@ -207,6 +219,8 @@ def generate_anki_deck(
deck_name: Name for the deck.
include_context: Whether to include context in cards.
no_translate: If True, skip translation (use placeholder).
excerpt: The target excerpt text to include in cards.
excerpt_words: List of (word, rank) tuples for words in the excerpt.
Returns:
Semicolon-separated content ready for Anki import.
@ -224,6 +238,27 @@ def generate_anki_deck(
lines.append("#columns:Front;Back;Rank")
lines.append("") # Empty line before data
# Add excerpt as first card (goal/context card)
if excerpt:
excerpt_escaped = excerpt.replace(";", ",")
# Use excerpt_words from C output (has correct ranks)
if excerpt_words:
# Most frequent = lowest rank (italics), rarest = highest rank (bold)
most_frequent = min(excerpt_words, key=lambda x: x[1])[0]
rarest = max(excerpt_words, key=lambda x: x[1])[0]
# Apply formatting - rarest first (bold), then most frequent (italics)
# to avoid nested tag issues if they're the same word
if most_frequent != rarest:
pattern_rare = re.compile(rf"\b({re.escape(rarest)})\b", re.IGNORECASE)
excerpt_escaped = pattern_rare.sub(r"<b>\1</b>", excerpt_escaped)
pattern_freq = re.compile(rf"\b({re.escape(most_frequent)})\b", re.IGNORECASE)
excerpt_escaped = pattern_freq.sub(r"<i>\1</i>", excerpt_escaped)
else:
# Same word is both most and least frequent - use bold+italic
pattern = re.compile(rf"\b({re.escape(most_frequent)})\b", re.IGNORECASE)
excerpt_escaped = pattern.sub(r"<b><i>\1</i></b>", excerpt_escaped)
lines.append(f"📖 TARGET EXCERPT;{excerpt_escaped};#0")
# Get translations (or skip if no_translate)
words = [w for w, _ in words_with_ranks]
if no_translate:
@ -263,6 +298,120 @@ def generate_anki_deck(
return "\n".join(lines)
def get_cached_excerpt(
filepath: Path, length: int, *, force: bool = False
) -> tuple[str, list[tuple[str, int]]] | None:
"""Get cached excerpt if available.
Args:
filepath: Path to source file.
length: Excerpt length.
force: If True, ignore cache.
Returns:
Tuple of (excerpt, words) or None if not cached.
"""
if force:
return None
try:
from python_pkg.word_frequency.cache import get_vocab_curve_cache
return get_vocab_curve_cache().get(filepath, length)
except ImportError:
return None
def cache_excerpt(
filepath: Path, length: int, excerpt: str, words: list[tuple[str, int]]
) -> None:
"""Store excerpt in cache.
Args:
filepath: Path to source file.
length: Excerpt length.
excerpt: The excerpt text.
words: List of (word, rank) tuples.
"""
try:
from python_pkg.word_frequency.cache import get_vocab_curve_cache
get_vocab_curve_cache().set(filepath, length, excerpt, words)
except ImportError:
pass
def get_cached_deck(
filepath: Path,
length: int,
target_lang: str,
include_context: bool,
all_vocab: bool,
*,
force: bool = False,
) -> tuple[str, str, int, int] | None:
"""Get cached Anki deck if available.
Args:
filepath: Path to source file.
length: Excerpt length.
target_lang: Target language.
include_context: Whether context is included.
all_vocab: Whether all vocab is included.
force: If True, ignore cache.
Returns:
Tuple of (content, excerpt, num_words, max_rank) or None.
"""
if force:
return None
try:
from python_pkg.word_frequency.cache import get_anki_deck_cache
return get_anki_deck_cache().get(
filepath, length, target_lang, include_context, all_vocab
)
except ImportError:
return None
def cache_deck(
filepath: Path,
length: int,
target_lang: str,
include_context: bool,
all_vocab: bool,
anki_content: str,
excerpt: str,
num_words: int,
max_rank: int,
) -> None:
"""Store Anki deck in cache.
Args:
filepath: Path to source file.
length: Excerpt length.
target_lang: Target language.
include_context: Whether context is included.
all_vocab: Whether all vocab is included.
anki_content: The deck content.
excerpt: The excerpt text.
num_words: Number of words.
max_rank: Maximum rank.
"""
try:
from python_pkg.word_frequency.cache import get_anki_deck_cache
get_anki_deck_cache().set(
filepath,
length,
target_lang,
include_context,
all_vocab,
anki_content,
excerpt,
num_words,
max_rank,
)
except ImportError:
pass
def generate_flashcards(
filepath: str | Path,
excerpt_length: int,
@ -272,6 +421,8 @@ def generate_flashcards(
deck_name: str | None = None,
all_vocab: bool = True,
no_translate: bool = False,
*,
force: bool = False,
) -> tuple[str, str, int, int]:
"""Generate Anki flashcards for vocabulary needed for an excerpt length.
@ -285,26 +436,39 @@ def generate_flashcards(
all_vocab: If True, include ALL words from rank 1 to max rank needed.
If False, only include words that appear in the excerpt.
no_translate: If True, skip translation.
force: If True, ignore all caches and regenerate.
Returns:
Tuple of (anki_content, excerpt, num_words, max_rank).
"""
filepath = Path(filepath)
# Read the text
text = read_file(filepath)
# Check for cached full deck (if not using no_translate)
if not no_translate and not force:
cached = get_cached_deck(
filepath, excerpt_length, target_lang, include_context, all_vocab
)
if cached is not None:
return cached
# Read the text (only needed for context finding)
text = read_file(filepath) if include_context else ""
# Auto-detect language if not provided
if source_lang is None:
source_lang = detect_language(text)
sample_text = read_file(filepath)[:1000] if not text else text[:1000]
source_lang = detect_language(sample_text)
if source_lang is None:
source_lang = "auto"
raise ValueError(
"Could not auto-detect source language. "
"Please specify with --from (e.g., --from pl for Polish). "
"Install langdetect for auto-detection: pip install langdetect"
)
# Run vocabulary curve analysis
output = run_vocabulary_curve(filepath, excerpt_length)
# Parse the output
excerpt, excerpt_words = parse_vocabulary_curve_output(output, excerpt_length)
# Run vocabulary curve analysis with vocab dump for all words
output = run_vocabulary_curve(filepath, excerpt_length, dump_vocab=all_vocab)
# Parse the output (now includes all vocabulary from C)
excerpt, excerpt_words, all_vocab_words = parse_vocabulary_curve_output(output, excerpt_length)
if not excerpt_words:
raise ValueError(f"No words found for excerpt length {excerpt_length}")
@ -312,15 +476,17 @@ def generate_flashcards(
# Find max rank needed
max_rank = max(rank for _, rank in excerpt_words)
# Get ALL words up to max_rank if requested
if all_vocab:
words_with_ranks = get_top_n_words(text, max_rank)
# Use vocabulary from C output
if all_vocab and all_vocab_words:
words_with_ranks = all_vocab_words
else:
words_with_ranks = excerpt_words
# Get contexts if requested
contexts = None
if include_context:
if not text:
text = read_file(filepath)
words = [w for w, _ in words_with_ranks]
contexts = find_word_contexts(text, words)
@ -337,6 +503,22 @@ def generate_flashcards(
deck_name,
include_context,
no_translate,
excerpt,
excerpt_words,
)
# Cache the full deck (if translated)
if not no_translate:
cache_deck(
filepath,
excerpt_length,
target_lang,
include_context,
all_vocab,
anki_content,
excerpt,
len(words_with_ranks),
max_rank,
)
return anki_content, excerpt, len(words_with_ranks), max_rank
@ -361,19 +543,18 @@ def main(argv: Sequence[str] | None = None) -> int:
"--file",
"-f",
type=str,
required=True,
default=None,
help="Path to the text file to analyze",
)
parser.add_argument(
"--length",
"-l",
type=int,
required=True,
default=None,
help="Target excerpt length (how many words you want to understand)",
)
parser.add_argument(
"--from",
"-F",
dest="source_lang",
type=str,
default=None,
@ -425,9 +606,72 @@ def main(argv: Sequence[str] | None = None) -> int:
action="store_true",
help="Skip translation (output words without translations)",
)
parser.add_argument(
"--force",
"-F",
action="store_true",
help="Force regeneration, ignoring all caches",
)
parser.add_argument(
"--cache-stats",
action="store_true",
help="Show cache statistics and exit",
)
parser.add_argument(
"--clear-cache",
action="store_true",
help="Clear all caches and exit",
)
args = parser.parse_args(argv)
# Handle cache management commands
if args.cache_stats:
try:
from python_pkg.word_frequency.cache import get_all_cache_stats
except ImportError:
try:
from cache import get_all_cache_stats
except ImportError:
print("Cache module not available", file=sys.stderr) # noqa: T201
return 1
stats = get_all_cache_stats()
print("Cache Statistics") # noqa: T201
print("=" * 50) # noqa: T201
for cache_name, cache_stats in stats.items():
print(f"\n{cache_name.upper()}:") # noqa: T201
for key, value in cache_stats.items():
if key == "cache_size_bytes":
if value < 1024:
size_str = f"{value} B"
elif value < 1024 * 1024:
size_str = f"{value / 1024:.1f} KB"
else:
size_str = f"{value / (1024 * 1024):.1f} MB"
print(f" {key}: {size_str}") # noqa: T201
else:
print(f" {key}: {value}") # noqa: T201
return 0
if args.clear_cache:
try:
from python_pkg.word_frequency.cache import clear_all_caches
except ImportError:
try:
from cache import clear_all_caches
except ImportError:
print("Cache module not available", file=sys.stderr) # noqa: T201
return 1
clear_all_caches()
print("All caches cleared.") # noqa: T201
return 0
# Validate required arguments for main functionality
if args.file is None:
parser.error("--file/-f is required")
if args.length is None:
parser.error("--length/-l is required")
try:
filepath = Path(args.file)
if not filepath.exists():
@ -448,6 +692,7 @@ def main(argv: Sequence[str] | None = None) -> int:
deck_name=args.deck_name,
all_vocab=not args.excerpt_words_only,
no_translate=args.no_translate,
force=args.force,
)
# Determine output path

View File

@ -0,0 +1,641 @@
#!/usr/bin/env python3
"""Caching utilities for word frequency analysis.
Provides disk-based caching for:
- Translations (word -> translation mappings)
- Vocabulary curve excerpts (file + length -> excerpt + words)
- Generated Anki decks
Cache location: ~/.cache/word_frequency/
"""
from __future__ import annotations
import hashlib
import json
import os
from pathlib import Path
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
pass
# Default cache directory
DEFAULT_CACHE_DIR = Path.home() / ".cache" / "word_frequency"
def get_cache_dir() -> Path:
"""Get the cache directory, creating it if needed.
Returns:
Path to cache directory.
"""
cache_dir = Path(os.environ.get("WORD_FREQ_CACHE_DIR", str(DEFAULT_CACHE_DIR)))
cache_dir.mkdir(parents=True, exist_ok=True)
return cache_dir
def get_file_hash(filepath: Path) -> str:
"""Compute SHA256 hash of a file's contents.
Args:
filepath: Path to file.
Returns:
Hex digest of file hash.
"""
hasher = hashlib.sha256()
with open(filepath, "rb") as f:
# Read in chunks for large files
for chunk in iter(lambda: f.read(65536), b""):
hasher.update(chunk)
return hasher.hexdigest()
def get_text_hash(text: str) -> str:
"""Compute SHA256 hash of text content.
Args:
text: Text to hash.
Returns:
Hex digest of text hash.
"""
return hashlib.sha256(text.encode("utf-8")).hexdigest()
# =============================================================================
# Translation Cache
# =============================================================================
class TranslationCache:
"""Cache for word translations."""
def __init__(self, cache_dir: Path | None = None) -> None:
"""Initialize translation cache.
Args:
cache_dir: Optional custom cache directory.
"""
self.cache_dir = cache_dir or get_cache_dir()
self.cache_file = self.cache_dir / "translations.json"
self._cache: dict[str, str] | None = None
self._dirty = False # Track if cache needs saving
def _load_cache(self) -> dict[str, str]:
"""Load cache from disk."""
if self._cache is None:
if self.cache_file.exists():
try:
self._cache = json.loads(self.cache_file.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
self._cache = {}
else:
self._cache = {}
return self._cache
def _save_cache(self) -> None:
"""Save cache to disk if dirty."""
if self._cache is not None and self._dirty:
self.cache_file.write_text(
json.dumps(self._cache, ensure_ascii=False, indent=2),
encoding="utf-8",
)
self._dirty = False
def flush(self) -> None:
"""Force save cache to disk."""
self._save_cache()
@staticmethod
def _make_key(word: str, source_lang: str, target_lang: str) -> str:
"""Create cache key for a translation.
Args:
word: Word to translate.
source_lang: Source language code.
target_lang: Target language code.
Returns:
Cache key string.
"""
return f"{source_lang}:{target_lang}:{word.lower()}"
def get(
self, word: str, source_lang: str, target_lang: str
) -> str | None:
"""Get cached translation.
Args:
word: Word to look up.
source_lang: Source language code.
target_lang: Target language code.
Returns:
Cached translation or None if not found.
"""
cache = self._load_cache()
key = self._make_key(word, source_lang, target_lang)
return cache.get(key)
def set(
self, word: str, source_lang: str, target_lang: str, translation: str,
*, auto_save: bool = False,
) -> None:
"""Store translation in cache.
Args:
word: Original word.
source_lang: Source language code.
target_lang: Target language code.
translation: Translated word.
auto_save: If True, save to disk immediately.
"""
cache = self._load_cache()
key = self._make_key(word, source_lang, target_lang)
cache[key] = translation
self._dirty = True
if auto_save:
self._save_cache()
def get_many(
self, words: list[str], source_lang: str, target_lang: str
) -> dict[str, str]:
"""Get multiple cached translations.
Args:
words: Words to look up.
source_lang: Source language code.
target_lang: Target language code.
Returns:
Dict mapping words to their cached translations.
"""
cache = self._load_cache()
result: dict[str, str] = {}
for word in words:
key = self._make_key(word, source_lang, target_lang)
if key in cache:
result[word.lower()] = cache[key]
return result
def set_many(
self,
translations: dict[str, str],
source_lang: str,
target_lang: str,
) -> None:
"""Store multiple translations in cache and save to disk.
Args:
translations: Dict mapping words to translations.
source_lang: Source language code.
target_lang: Target language code.
"""
cache = self._load_cache()
for word, translation in translations.items():
key = self._make_key(word, source_lang, target_lang)
cache[key] = translation
self._dirty = True
self._save_cache() # Save once after all additions
def clear(self) -> None:
"""Clear all cached translations."""
self._cache = {}
self._dirty = False
if self.cache_file.exists():
self.cache_file.unlink()
def stats(self) -> dict[str, Any]:
"""Get cache statistics.
Returns:
Dict with cache stats.
"""
cache = self._load_cache()
return {
"total_entries": len(cache),
"cache_file": str(self.cache_file),
"cache_size_bytes": (
self.cache_file.stat().st_size if self.cache_file.exists() else 0
),
}
# =============================================================================
# Vocabulary Curve Cache
# =============================================================================
class VocabCurveCache:
"""Cache for vocabulary curve analysis results."""
def __init__(self, cache_dir: Path | None = None) -> None:
"""Initialize vocabulary curve cache.
Args:
cache_dir: Optional custom cache directory.
"""
self.cache_dir = (cache_dir or get_cache_dir()) / "excerpts"
self.cache_dir.mkdir(parents=True, exist_ok=True)
def _get_cache_path(self, file_hash: str, length: int) -> Path:
"""Get path to cache file for given hash and length.
Args:
file_hash: Hash of source file.
length: Excerpt length.
Returns:
Path to cache file.
"""
return self.cache_dir / f"{file_hash[:16]}_{length}.json"
def get(
self, filepath: Path, length: int
) -> tuple[str, list[tuple[str, int]]] | None:
"""Get cached excerpt and words for a file and length.
Args:
filepath: Path to source file.
length: Excerpt length.
Returns:
Tuple of (excerpt, words_with_ranks) or None if not cached.
"""
file_hash = get_file_hash(filepath)
cache_path = self._get_cache_path(file_hash, length)
if not cache_path.exists():
return None
try:
data = json.loads(cache_path.read_text(encoding="utf-8"))
# Verify hash matches
if data.get("file_hash") != file_hash:
return None
excerpt = data["excerpt"]
words = [(w, r) for w, r in data["words"]]
return excerpt, words
except (json.JSONDecodeError, KeyError, OSError):
return None
def set(
self,
filepath: Path,
length: int,
excerpt: str,
words: list[tuple[str, int]],
) -> None:
"""Store excerpt and words in cache.
Args:
filepath: Path to source file.
length: Excerpt length.
excerpt: The excerpt text.
words: List of (word, rank) tuples.
"""
file_hash = get_file_hash(filepath)
cache_path = self._get_cache_path(file_hash, length)
data = {
"file_hash": file_hash,
"filepath": str(filepath),
"length": length,
"excerpt": excerpt,
"words": [[w, r] for w, r in words],
}
cache_path.write_text(
json.dumps(data, ensure_ascii=False, indent=2),
encoding="utf-8",
)
def clear(self) -> None:
"""Clear all cached excerpts."""
for cache_file in self.cache_dir.glob("*.json"):
cache_file.unlink()
def stats(self) -> dict[str, Any]:
"""Get cache statistics.
Returns:
Dict with cache stats.
"""
cache_files = list(self.cache_dir.glob("*.json"))
total_size = sum(f.stat().st_size for f in cache_files)
return {
"total_entries": len(cache_files),
"cache_dir": str(self.cache_dir),
"cache_size_bytes": total_size,
}
# =============================================================================
# Anki Deck Cache
# =============================================================================
class AnkiDeckCache:
"""Cache for generated Anki decks."""
def __init__(self, cache_dir: Path | None = None) -> None:
"""Initialize Anki deck cache.
Args:
cache_dir: Optional custom cache directory.
"""
self.cache_dir = (cache_dir or get_cache_dir()) / "anki_decks"
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.metadata_file = self.cache_dir / "metadata.json"
self._metadata: dict[str, Any] | None = None
def _load_metadata(self) -> dict[str, Any]:
"""Load metadata from disk."""
if self._metadata is None:
if self.metadata_file.exists():
try:
self._metadata = json.loads(
self.metadata_file.read_text(encoding="utf-8")
)
except (json.JSONDecodeError, OSError):
self._metadata = {}
else:
self._metadata = {}
return self._metadata
def _save_metadata(self) -> None:
"""Save metadata to disk."""
if self._metadata is not None:
self.metadata_file.write_text(
json.dumps(self._metadata, ensure_ascii=False, indent=2),
encoding="utf-8",
)
@staticmethod
def _make_key(
file_hash: str,
length: int,
target_lang: str,
include_context: bool,
all_vocab: bool,
) -> str:
"""Create cache key for an Anki deck.
Args:
file_hash: Hash of source file.
length: Excerpt length.
target_lang: Target language.
include_context: Whether context is included.
all_vocab: Whether all vocab is included.
Returns:
Cache key string.
"""
flags = f"ctx{int(include_context)}_all{int(all_vocab)}"
return f"{file_hash[:16]}_{length}_{target_lang}_{flags}"
def get(
self,
filepath: Path,
length: int,
target_lang: str,
include_context: bool,
all_vocab: bool,
) -> tuple[str, str, int, int] | None:
"""Get cached Anki deck.
Args:
filepath: Path to source file.
length: Excerpt length.
target_lang: Target language.
include_context: Whether context is included.
all_vocab: Whether all vocab is included.
Returns:
Tuple of (anki_content, excerpt, num_words, max_rank) or None.
"""
file_hash = get_file_hash(filepath)
key = self._make_key(file_hash, length, target_lang, include_context, all_vocab)
metadata = self._load_metadata()
if key not in metadata:
return None
entry = metadata[key]
if entry.get("file_hash") != file_hash:
return None
deck_file = self.cache_dir / f"{key}.txt"
if not deck_file.exists():
return None
try:
content = deck_file.read_text(encoding="utf-8")
return (
content,
entry["excerpt"],
entry["num_words"],
entry["max_rank"],
)
except OSError:
return None
def set(
self,
filepath: Path,
length: int,
target_lang: str,
include_context: bool,
all_vocab: bool,
anki_content: str,
excerpt: str,
num_words: int,
max_rank: int,
) -> None:
"""Store Anki deck in cache.
Args:
filepath: Path to source file.
length: Excerpt length.
target_lang: Target language.
include_context: Whether context is included.
all_vocab: Whether all vocab is included.
anki_content: The Anki deck content.
excerpt: The excerpt text.
num_words: Number of words in deck.
max_rank: Maximum word rank.
"""
file_hash = get_file_hash(filepath)
key = self._make_key(file_hash, length, target_lang, include_context, all_vocab)
# Save deck content
deck_file = self.cache_dir / f"{key}.txt"
deck_file.write_text(anki_content, encoding="utf-8")
# Update metadata
metadata = self._load_metadata()
metadata[key] = {
"file_hash": file_hash,
"filepath": str(filepath),
"length": length,
"target_lang": target_lang,
"include_context": include_context,
"all_vocab": all_vocab,
"excerpt": excerpt,
"num_words": num_words,
"max_rank": max_rank,
}
self._save_metadata()
def clear(self) -> None:
"""Clear all cached decks."""
self._metadata = {}
for cache_file in self.cache_dir.glob("*.txt"):
cache_file.unlink()
if self.metadata_file.exists():
self.metadata_file.unlink()
def stats(self) -> dict[str, Any]:
"""Get cache statistics.
Returns:
Dict with cache stats.
"""
metadata = self._load_metadata()
cache_files = list(self.cache_dir.glob("*.txt"))
total_size = sum(f.stat().st_size for f in cache_files)
return {
"total_entries": len(metadata),
"cache_dir": str(self.cache_dir),
"cache_size_bytes": total_size,
}
# =============================================================================
# Global Cache Instances
# =============================================================================
# Singleton instances
_translation_cache: TranslationCache | None = None
_vocab_curve_cache: VocabCurveCache | None = None
_anki_deck_cache: AnkiDeckCache | None = None
def get_translation_cache() -> TranslationCache:
"""Get the global translation cache instance."""
global _translation_cache # noqa: PLW0603
if _translation_cache is None:
_translation_cache = TranslationCache()
return _translation_cache
def get_vocab_curve_cache() -> VocabCurveCache:
"""Get the global vocabulary curve cache instance."""
global _vocab_curve_cache # noqa: PLW0603
if _vocab_curve_cache is None:
_vocab_curve_cache = VocabCurveCache()
return _vocab_curve_cache
def get_anki_deck_cache() -> AnkiDeckCache:
"""Get the global Anki deck cache instance."""
global _anki_deck_cache # noqa: PLW0603
if _anki_deck_cache is None:
_anki_deck_cache = AnkiDeckCache()
return _anki_deck_cache
def clear_all_caches() -> None:
"""Clear all caches."""
get_translation_cache().clear()
get_vocab_curve_cache().clear()
get_anki_deck_cache().clear()
def get_all_cache_stats() -> dict[str, dict[str, Any]]:
"""Get statistics for all caches.
Returns:
Dict with stats for each cache type.
"""
return {
"translations": get_translation_cache().stats(),
"vocab_curves": get_vocab_curve_cache().stats(),
"anki_decks": get_anki_deck_cache().stats(),
}
def main() -> int:
"""CLI for cache management.
Returns:
Exit code.
"""
import argparse
parser = argparse.ArgumentParser(description="Manage word frequency caches")
parser.add_argument(
"--stats", action="store_true", help="Show cache statistics"
)
parser.add_argument(
"--clear", action="store_true", help="Clear all caches"
)
parser.add_argument(
"--clear-translations", action="store_true", help="Clear translation cache"
)
parser.add_argument(
"--clear-excerpts", action="store_true", help="Clear excerpt cache"
)
parser.add_argument(
"--clear-anki", action="store_true", help="Clear Anki deck cache"
)
args = parser.parse_args()
if args.clear:
clear_all_caches()
print("All caches cleared.") # noqa: T201
return 0
if args.clear_translations:
get_translation_cache().clear()
print("Translation cache cleared.") # noqa: T201
return 0
if args.clear_excerpts:
get_vocab_curve_cache().clear()
print("Excerpt cache cleared.") # noqa: T201
return 0
if args.clear_anki:
get_anki_deck_cache().clear()
print("Anki deck cache cleared.") # noqa: T201
return 0
# Default: show stats
stats = get_all_cache_stats()
print("Cache Statistics") # noqa: T201
print("=" * 50) # noqa: T201
for cache_name, cache_stats in stats.items():
print(f"\n{cache_name.upper()}:") # noqa: T201
for key, value in cache_stats.items():
if key == "cache_size_bytes":
# Format as human-readable
if value < 1024:
size_str = f"{value} B"
elif value < 1024 * 1024:
size_str = f"{value / 1024:.1f} KB"
else:
size_str = f"{value / (1024 * 1024):.1f} MB"
print(f" {key}: {size_str}") # noqa: T201
else:
print(f" {key}: {value}") # noqa: T201
return 0
if __name__ == "__main__":
import sys
sys.exit(main())

View File

@ -0,0 +1,153 @@
#!/bin/bash
# Wrapper script for anki_generator that ensures argostranslate is available
#
# Usage: ./run_anki_generator.sh [anki_generator args...]
# Example: ./run_anki_generator.sh --file text.txt --length 20 --from pl --to en
set -e
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# Use /tmp for venv to avoid home directory quota issues
VENV_DIR="/tmp/.venv_argos_$(id -u)"
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
log_info() {
echo -e "${GREEN}[INFO]${NC} $1"
}
log_warn() {
echo -e "${YELLOW}[WARN]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# Convert relative file paths to absolute before changing directories
resolve_file_paths() {
local args=()
local i=0
while [[ $i -lt ${#ORIGINAL_ARGS[@]} ]]; do
local arg="${ORIGINAL_ARGS[$i]}"
if [[ "$arg" == "--file" || "$arg" == "-f" ]]; then
args+=("$arg")
((i++))
if [[ $i -lt ${#ORIGINAL_ARGS[@]} ]]; then
local file="${ORIGINAL_ARGS[$i]}"
# Convert relative path to absolute
if [[ -f "$file" ]]; then
file="$(cd "$(dirname "$file")" && pwd)/$(basename "$file")"
fi
args+=("$file")
fi
else
args+=("$arg")
fi
((i++))
done
echo "${args[@]}"
}
# Store original args before any directory changes
ORIGINAL_ARGS=("$@")
# Check if argostranslate is available
check_argos() {
python -c "import argostranslate" 2>/dev/null
}
# Try to install argostranslate using pipx (system-wide)
try_pipx_install() {
if command -v pipx &>/dev/null; then
log_info "Trying pipx install argostranslate..."
if pipx install argostranslate 2>/dev/null; then
log_info "argostranslate installed via pipx"
return 0
fi
fi
return 1
}
# Create/use a virtualenv for argostranslate
setup_venv() {
# Use /tmp for pip cache to avoid home directory quota issues
export PIP_CACHE_DIR="/tmp/.pip_cache_$(id -u)"
mkdir -p "$PIP_CACHE_DIR"
if [[ ! -d "$VENV_DIR" ]]; then
log_info "Creating virtual environment at $VENV_DIR..."
python -m venv "$VENV_DIR"
fi
# Activate venv
source "$VENV_DIR/bin/activate"
# Install argostranslate if not present
if ! python -c "import argostranslate" 2>/dev/null; then
log_info "Installing argostranslate in virtualenv (this may take a few minutes)..."
# Use CPU-only PyTorch to reduce download size significantly (~200MB vs ~900MB)
# Use --no-cache-dir to avoid any cache writes to home directory
pip install --progress-bar on --no-cache-dir torch --index-url https://download.pytorch.org/whl/cpu
pip install --progress-bar on --no-cache-dir argostranslate
fi
# Install langdetect for auto language detection
if ! python -c "import langdetect" 2>/dev/null; then
log_info "Installing langdetect for auto language detection..."
pip install --progress-bar on --no-cache-dir langdetect
fi
# Also ensure other dependencies are available
if [[ -f "${SCRIPT_DIR}/../../requirements.txt" ]]; then
pip install --progress-bar on --no-cache-dir -r "${SCRIPT_DIR}/../../requirements.txt" 2>/dev/null || true
fi
log_info "Using virtualenv: $VENV_DIR"
}
# Main logic
main() {
# Resolve file paths to absolute before changing directories
local resolved_args
resolved_args=$(resolve_file_paths)
# If --no-translate is passed, we don't need argostranslate
if [[ " $* " =~ " --no-translate " ]] || [[ " $* " =~ " -n " ]]; then
log_info "Running without translation (--no-translate)"
cd "$(dirname "$SCRIPT_DIR")" && cd ..
python -m python_pkg.word_frequency.anki_generator $resolved_args
exit $?
fi
# Check if argostranslate is already available
if check_argos; then
log_info "argostranslate is available"
cd "$(dirname "$SCRIPT_DIR")" && cd ..
python -m python_pkg.word_frequency.anki_generator $resolved_args
exit $?
fi
log_warn "argostranslate not found in system Python"
# Try pipx first (cleaner system-wide installation)
if try_pipx_install && check_argos; then
cd "$(dirname "$SCRIPT_DIR")" && cd ..
python -m python_pkg.word_frequency.anki_generator $resolved_args
exit $?
fi
# Fall back to virtualenv
log_info "Setting up virtualenv with argostranslate..."
setup_venv
# Run in venv context
cd "$(dirname "$SCRIPT_DIR")" && cd ..
python -m python_pkg.word_frequency.anki_generator $resolved_args
}
main "$@"

File diff suppressed because it is too large Load Diff

View File

@ -13,7 +13,6 @@ try:
find_word_contexts,
generate_anki_deck,
generate_flashcards,
get_top_n_words,
main,
parse_vocabulary_curve_output,
)
@ -24,7 +23,6 @@ except ImportError:
find_word_contexts,
generate_anki_deck,
generate_flashcards,
get_top_n_words,
main,
parse_vocabulary_curve_output,
)
@ -80,30 +78,44 @@ class TestParseVocabularyCurveOutput:
def test_parse_length_1(self, sample_vocabulary_output: str) -> None:
"""Test parsing output for length 1."""
excerpt, words = parse_vocabulary_curve_output(sample_vocabulary_output, 1)
excerpt, excerpt_words, all_vocab = parse_vocabulary_curve_output(sample_vocabulary_output, 1)
assert excerpt == "the"
assert words == [("the", 1)]
assert excerpt_words == [("the", 1)]
def test_parse_length_2(self, sample_vocabulary_output: str) -> None:
"""Test parsing output for length 2."""
excerpt, words = parse_vocabulary_curve_output(sample_vocabulary_output, 2)
excerpt, excerpt_words, all_vocab = parse_vocabulary_curve_output(sample_vocabulary_output, 2)
assert excerpt == "the dog"
assert words == [("the", 1), ("dog", 2)]
assert excerpt_words == [("the", 1), ("dog", 2)]
def test_parse_length_3(self, sample_vocabulary_output: str) -> None:
"""Test parsing output for length 3."""
excerpt, words = parse_vocabulary_curve_output(sample_vocabulary_output, 3)
excerpt, excerpt_words, all_vocab = parse_vocabulary_curve_output(sample_vocabulary_output, 3)
assert excerpt == "the quick fox"
assert len(words) == 3
assert ("the", 1) in words
assert ("quick", 3) in words
assert ("fox", 5) in words
assert len(excerpt_words) == 3
assert ("the", 1) in excerpt_words
assert ("quick", 3) in excerpt_words
assert ("fox", 5) in excerpt_words
def test_parse_nonexistent_length(self, sample_vocabulary_output: str) -> None:
"""Test parsing output for non-existent length."""
excerpt, words = parse_vocabulary_curve_output(sample_vocabulary_output, 100)
excerpt, excerpt_words, all_vocab = parse_vocabulary_curve_output(sample_vocabulary_output, 100)
assert excerpt == ""
assert words == []
assert excerpt_words == []
def test_parse_vocab_dump(self) -> None:
"""Test parsing VOCAB_DUMP section."""
output = """[Length 2] Vocab needed: 2
Excerpt: "hello world"
Words: hello(#1), world(#2)
VOCAB_DUMP_START
hello;1
world;2
VOCAB_DUMP_END
"""
excerpt, excerpt_words, all_vocab = parse_vocabulary_curve_output(output, 2)
assert all_vocab == [("hello", 1), ("world", 2)]
# Tests for find_word_contexts
@ -250,31 +262,6 @@ class TestGenerateAnkiDeck:
assert "world" in result
# Tests for get_top_n_words
class TestGetTopNWords:
"""Tests for getting top N words."""
def test_get_top_5_words(self) -> None:
"""Test getting top 5 words from text."""
text = "the cat sat on the mat the cat meowed"
words = get_top_n_words(text, 5)
assert len(words) == 5
# 'the' appears 3x, 'cat' appears 2x
assert words[0][0] == "the"
assert words[0][1] == 1
assert words[1][0] == "cat"
assert words[1][1] == 2
def test_ranks_are_sequential(self) -> None:
"""Test that ranks are 1-based and sequential."""
text = "one two three four five six seven eight"
words = get_top_n_words(text, 8)
ranks = [r for _, r in words]
assert ranks == [1, 2, 3, 4, 5, 6, 7, 8]
# Tests for main function

View File

@ -4,6 +4,8 @@ from __future__ import annotations
import time
from pathlib import Path
from typing import TYPE_CHECKING
from unittest.mock import MagicMock, patch
import pytest
@ -13,6 +15,40 @@ from python_pkg.word_frequency.learning_pipe import (
load_stopwords,
main,
)
import python_pkg.word_frequency.learning_pipe as learning_pipe_module
from python_pkg.word_frequency.translator import TranslationResult
if TYPE_CHECKING:
from collections.abc import Generator
@pytest.fixture
def mock_translation() -> Generator[MagicMock, None, None]:
"""Mock translation to avoid requiring argostranslate."""
def fake_batch_translate(
words: list[str],
from_lang: str,
to_lang: str,
*,
use_cache: bool = True, # noqa: ARG001
) -> list[TranslationResult]:
"""Fake batch translation that returns word with prefix."""
return [
TranslationResult(
source_word=word,
translated_word=f"translated_{word}",
source_lang=from_lang,
target_lang=to_lang,
success=True,
)
for word in words
]
# Need to patch in learning_pipe module since it imports the function directly
with patch.object(
learning_pipe_module, "translate_words_batch", side_effect=fake_batch_translate
):
yield
class TestLoadStopwords:
@ -162,7 +198,9 @@ class TestGenerateLearningLesson:
class TestMain:
"""Tests for main CLI function."""
def test_basic_text_input(self, capsys: pytest.CaptureFixture[str]) -> None:
def test_basic_text_input(
self, capsys: pytest.CaptureFixture[str], mock_translation: None
) -> None:
"""Test with text input."""
exit_code = main(
[
@ -179,7 +217,7 @@ class TestMain:
assert "LANGUAGE LEARNING LESSON" in captured.out
def test_file_input(
self, tmp_path: Path, capsys: pytest.CaptureFixture[str]
self, tmp_path: Path, capsys: pytest.CaptureFixture[str], mock_translation: None
) -> None:
"""Test with file input."""
test_file = tmp_path / "test.txt"
@ -199,7 +237,7 @@ class TestMain:
assert exit_code == 0
assert "hello" in captured.out.lower()
def test_output_to_file(self, tmp_path: Path) -> None:
def test_output_to_file(self, tmp_path: Path, mock_translation: None) -> None:
"""Test outputting to file."""
output_file = tmp_path / "lesson.txt"
@ -219,7 +257,7 @@ class TestMain:
assert "LANGUAGE LEARNING LESSON" in content
def test_custom_stopwords(
self, tmp_path: Path, capsys: pytest.CaptureFixture[str]
self, tmp_path: Path, capsys: pytest.CaptureFixture[str], mock_translation: None
) -> None:
"""Test with custom stopwords file."""
stopwords_file = tmp_path / "stop.txt"
@ -242,7 +280,7 @@ class TestMain:
# "hello" should be filtered by custom stopwords
def test_multiple_batches_option(
self, capsys: pytest.CaptureFixture[str]
self, capsys: pytest.CaptureFixture[str], mock_translation: None
) -> None:
"""Test --batches option."""
text = " ".join(f"word{i}" * (50 - i) for i in range(30))
@ -329,10 +367,10 @@ class TestTranslationIntegration:
# Should not have translation arrows
assert " -> " not in result or "Translation" not in result
def test_lesson_with_translation_params(self) -> None:
def test_lesson_with_translation_params(self, mock_translation: None) -> None:
"""Test that translation params are accepted."""
text = "hello world hello world hello"
# This should not crash even without argostranslate installed
# This should work with mocked translation
result = generate_learning_lesson(
text,
batch_size=5,
@ -346,12 +384,14 @@ class TestTranslationIntegration:
assert "VOCABULARY TO LEARN:" in result
assert "hello" in result
def test_main_with_translate_flags(self, tmp_path: Path) -> None:
def test_main_with_translate_flags(
self, tmp_path: Path, mock_translation: None
) -> None:
"""Test that main accepts translation flags."""
text_file = tmp_path / "test.txt"
text_file.write_text("hello world hello world hello", encoding="utf-8")
# Should not crash even if translation fails
# Should work with mocked translation
result = main([
"--file", str(text_file),
"--translate-from", "en",
@ -361,7 +401,9 @@ class TestTranslationIntegration:
assert result == 0
def test_translate_to_defaults_to_english(self, capsys: pytest.CaptureFixture[str]) -> None:
def test_translate_to_defaults_to_english(
self, capsys: pytest.CaptureFixture[str], mock_translation: None
) -> None:
"""Test that translate_to defaults to 'en' when using auto-detection."""
text = "hello world"
# When using --translate flag (translate_from="auto"), translate_to defaults to "en"

View File

@ -47,15 +47,22 @@ except ImportError:
# Helper context manager for mocking argostranslate
class ArgosAvailableMock:
"""Context manager to mock argostranslate being available."""
"""Context manager to mock argostranslate being available and control its output.
Works whether argos is installed or not by patching sys.modules.
"""
def __init__(self, translate_returns: str | list[str] | Exception | None = None) -> None:
"""Initialize with return values for translate()."""
self.translate_returns = translate_returns
self.mock_translate_fn = MagicMock()
self.mock_translate_module = MagicMock()
self.mock_package_module = MagicMock()
self.mock_parent = MagicMock()
self.original_available = translator._argos_available
self._sys_modules_patcher: MagicMock | None = None
self._ensure_patcher: MagicMock | None = None
self._lang_patcher: MagicMock | None = None
def __enter__(self) -> MagicMock:
"""Set up the mocks."""
@ -63,36 +70,52 @@ class ArgosAvailableMock:
# Set up translate return value
if isinstance(self.translate_returns, Exception):
self.mock_translate_module.translate.side_effect = self.translate_returns
self.mock_translate_fn.side_effect = self.translate_returns
elif isinstance(self.translate_returns, list):
self.mock_translate_module.translate.side_effect = self.translate_returns
self.mock_translate_fn.side_effect = self.translate_returns
elif self.translate_returns is not None:
self.mock_translate_module.translate.return_value = self.translate_returns
self.mock_translate_fn.return_value = self.translate_returns
# Link parent module to submodules (critical for Python imports)
# Wire up the mock modules
self.mock_translate_module.translate = self.mock_translate_fn
self.mock_translate_module.get_installed_languages = MagicMock(return_value=[])
self.mock_package_module.update_package_index = MagicMock()
self.mock_package_module.get_available_packages = MagicMock(return_value=[])
self.mock_parent.translate = self.mock_translate_module
self.mock_parent.package = self.mock_package_module
# Patch sys.modules
self.patchers = [
patch.dict(
# Patch sys.modules to inject our mock (works even if argos not installed)
self._sys_modules_patcher = patch.dict(
"sys.modules",
{
"argostranslate": self.mock_parent,
"argostranslate.translate": self.mock_translate_module,
"argostranslate.package": self.mock_package_module,
},
),
]
for p in self.patchers:
p.start()
)
return self.mock_translate_module
# Patch _ensure_argos_installed and _ensure_language_pair to no-op
self._ensure_patcher = patch.object(
translator, "_ensure_argos_installed", lambda: None
)
self._lang_patcher = patch.object(
translator, "_ensure_language_pair", lambda f, t: None
)
self._sys_modules_patcher.start()
self._ensure_patcher.start()
self._lang_patcher.start()
return self.mock_translate_fn
def __exit__(self, *args: object) -> None:
"""Restore original state."""
for p in self.patchers:
p.stop()
if self._lang_patcher:
self._lang_patcher.stop()
if self._ensure_patcher:
self._ensure_patcher.stop()
if self._sys_modules_patcher:
self._sys_modules_patcher.stop()
translator._argos_available = self.original_available
@ -101,25 +124,13 @@ class ArgosAvailableMock:
@pytest.fixture
def mock_argos_unavailable() -> Generator[None, None, None]:
"""Mock argostranslate being unavailable."""
"""Mock argostranslate being unavailable (for legacy tests)."""
original_value = translator._argos_available
translator._argos_available = False
yield
translator._argos_available = original_value
@pytest.fixture
def mock_all_translators_unavailable() -> Generator[None, None, None]:
"""Mock both argostranslate and deep-translator being unavailable."""
original_argos = translator._argos_available
original_deep = translator._deep_translator_available
translator._argos_available = False
translator._deep_translator_available = False
yield
translator._argos_available = original_argos
translator._deep_translator_available = original_deep
@pytest.fixture
def temp_words_file(tmp_path: Path) -> Path:
"""Create a temporary file with words."""
@ -174,43 +185,36 @@ class TestTranslationResult:
class TestTranslateWord:
"""Tests for translate_word function."""
"""Tests for translate_word function - offline-first behavior."""
def test_translate_word_all_backends_unavailable(
self, mock_all_translators_unavailable: None
) -> None:
"""Test translation when no backends are available."""
result = translate_word("hello", "en", "es")
assert result.success is False
assert "No translation backend" in str(result.error)
def test_translate_word_argos_unavailable_uses_deep_translator(
self, mock_argos_unavailable: None
) -> None:
"""Test that deep-translator is used when argos is unavailable."""
# deep-translator should work as fallback (it's installed)
result = translate_word("hello", "en", "es")
# This may succeed if deep-translator is installed
# Just verify we get a result without crashing
assert isinstance(result, TranslationResult)
def test_translate_word_argos_unavailable_raises(self) -> None:
"""Test that translation raises ImportError when argos is unavailable."""
# Mock _ensure_argos_installed to raise ImportError
with patch.object(
translator,
"_ensure_argos_installed",
side_effect=ImportError("argostranslate not available"),
):
with pytest.raises(ImportError, match="argostranslate not available"):
translate_word("hello", "en", "es", use_cache=False)
def test_translate_word_success(self) -> None:
"""Test successful word translation."""
with ArgosAvailableMock("hola"):
result = translate_word("hello", "en", "es")
result = translate_word("hello", "en", "es", use_cache=False)
assert result.source_word == "hello"
assert result.translated_word == "hola"
assert result.success is True
def test_translate_word_argos_exception_falls_back(
self, mock_argos_unavailable: None
) -> None:
"""Test that argos exception falls back to deep-translator."""
# With argos unavailable, deep-translator should be used
result = translate_word("hello", "en", "es")
# Just verify it doesn't crash - may succeed or fail depending on network
assert isinstance(result, TranslationResult)
def test_translate_word_argos_exception_returns_error(self) -> None:
"""Test that argos exception returns failed result with error."""
# Mock argos being available but translate raising an exception
with ArgosAvailableMock(RuntimeError("Translation failed")):
result = translate_word("hello", "en", "es", use_cache=False)
assert result.success is False
assert "Translation failed" in str(result.error)
# translate_words tests
@ -221,99 +225,123 @@ class TestTranslateWords:
def test_translate_empty_list(self) -> None:
"""Test translating empty list."""
# Empty list returns empty result without calling translation
results = translate_words([], "en", "es")
assert results == []
def test_translate_multiple_words(self) -> None:
"""Test translating multiple words."""
with ArgosAvailableMock(["hola", "mundo"]):
results = translate_words(["hello", "world"], "en", "es")
with ArgosAvailableMock(["hola", "mundo"]) as mock:
mock.side_effect = ["hola", "mundo"]
results = translate_words(["hello", "world"], "en", "es", use_cache=False)
assert len(results) == 2
assert results[0].translated_word == "hola"
assert results[1].translated_word == "mundo"
def test_translate_words_argos_unavailable_raises(self) -> None:
"""Test that translating words raises ImportError when argos unavailable."""
with patch.object(
translator,
"_ensure_argos_installed",
side_effect=ImportError("argostranslate not available"),
):
with pytest.raises(ImportError, match="argostranslate not available"):
translate_words(["hello", "world"], "en", "es", use_cache=False)
# translate_words_batch tests
class TestTranslateWordsBatch:
"""Tests for translate_words_batch function."""
"""Tests for translate_words_batch function - offline-first."""
def test_batch_empty_list(self) -> None:
"""Test batch translation of empty list."""
# Empty list doesn't require argos
with patch.object(translator, "_ensure_argos_installed", lambda: None):
results = translate_words_batch([], "en", "es")
assert results == []
def test_batch_small_list(self) -> None:
"""Test batch translation of small list (3 or fewer)."""
with ArgosAvailableMock(["uno", "dos", "tres"]) as mock:
results = translate_words_batch(["one", "two", "three"], "en", "es")
"""Test batch translation of small list (uses batch mode anyway)."""
with ArgosAvailableMock("uno\ndos\ntres") as mock:
results = translate_words_batch(
["one", "two", "three"], "en", "es", use_cache=False
)
assert len(results) == 3
# Small lists use individual translation
assert mock.translate.call_count == 3
# Batch translation
assert mock.call_count == 1
def test_batch_large_list_success(self) -> None:
"""Test batch translation of large list."""
words = ["one", "two", "three", "four", "five"]
with ArgosAvailableMock("uno\ndos\ntres\ncuatro\ncinco") as mock:
results = translate_words_batch(words, "en", "es")
results = translate_words_batch(words, "en", "es", use_cache=False)
assert len(results) == 5
# Batch translation called once
mock.translate.assert_called_once()
mock.assert_called_once()
assert results[0].translated_word == "uno"
assert results[4].translated_word == "cinco"
def test_batch_fallback_on_mismatch(self) -> None:
"""Test batch translation falls back when result count mismatches."""
"""Test batch translation falls back to individual when result count mismatches."""
words = ["one", "two", "three", "four"]
# First call (batch) returns wrong count, subsequent calls are individual
with ArgosAvailableMock(
["wrong\ncount", "uno", "dos", "tres", "cuatro"]
["wrong", "uno", "dos", "tres", "cuatro"]
) as mock:
results = translate_words_batch(words, "en", "es")
results = translate_words_batch(words, "en", "es", use_cache=False)
assert len(results) == 4
# Fallback to individual
assert mock.translate.call_count == 5
# Fallback to individual argos translation
assert mock.call_count == 5
def test_batch_fallback_on_exception(self) -> None:
"""Test batch translation falls back on exception."""
"""Test batch translation raises on exception (no fallback to online)."""
words = ["one", "two", "three", "four"]
# Create mock that raises first then succeeds
original = translator._argos_available
translator._argos_available = True
# Create mock that raises
mock_translate = MagicMock(side_effect=RuntimeError("Batch failed"))
mock_translate_module = MagicMock()
mock_translate_module.translate.side_effect = [
RuntimeError("Batch failed"),
"uno",
"dos",
"tres",
"cuatro",
]
mock_translate_module.translate = mock_translate
mock_package_module = MagicMock()
mock_parent = MagicMock()
mock_parent.translate = mock_translate_module
mock_parent.package = mock_package_module
with patch.dict(
original = translator._argos_available
translator._argos_available = True
with (
patch.dict(
"sys.modules",
{
"argostranslate": mock_parent,
"argostranslate.translate": mock_translate_module,
"argostranslate.package": mock_package_module,
},
),
patch.object(translator, "_ensure_argos_installed", lambda: None),
patch.object(translator, "_ensure_language_pair", lambda f, t: None),
pytest.raises(RuntimeError, match="Translation failed"),
):
results = translate_words_batch(words, "en", "es")
translate_words_batch(words, "en", "es", use_cache=False)
translator._argos_available = original
assert len(results) == 4
def test_batch_argos_unavailable_raises(self) -> None:
"""Test that batch translation raises ImportError when argos unavailable."""
with patch.object(
translator,
"_ensure_argos_installed",
side_effect=ImportError("argostranslate not available"),
):
with pytest.raises(ImportError, match="argostranslate not available"):
translate_words_batch(["hello", "world"], "en", "es", use_cache=False)
# format_translations tests
@ -394,10 +422,31 @@ class TestGetInstalledLanguages:
mock_lang2.code = "es"
mock_lang2.name = "Spanish"
with ArgosAvailableMock() as mock:
mock.get_installed_languages.return_value = [mock_lang1, mock_lang2]
# We need to mock the translate module's get_installed_languages
mock_translate_module = MagicMock()
mock_translate_module.get_installed_languages.return_value = [
mock_lang1, mock_lang2
]
mock_package_module = MagicMock()
mock_parent = MagicMock()
mock_parent.translate = mock_translate_module
mock_parent.package = mock_package_module
original = translator._argos_available
translator._argos_available = True
with patch.dict(
"sys.modules",
{
"argostranslate": mock_parent,
"argostranslate.translate": mock_translate_module,
"argostranslate.package": mock_package_module,
},
):
result = get_installed_languages()
translator._argos_available = original
assert ("en", "English") in result
assert ("es", "Spanish") in result
@ -462,10 +511,28 @@ class TestMain:
self, capsys: pytest.CaptureFixture[str]
) -> None:
"""Test listing languages when none installed."""
with ArgosAvailableMock() as mock:
mock.get_installed_languages.return_value = []
mock_translate_module = MagicMock()
mock_translate_module.get_installed_languages.return_value = []
mock_package_module = MagicMock()
mock_parent = MagicMock()
mock_parent.translate = mock_translate_module
mock_parent.package = mock_package_module
original = translator._argos_available
translator._argos_available = True
with patch.dict(
"sys.modules",
{
"argostranslate": mock_parent,
"argostranslate.translate": mock_translate_module,
"argostranslate.package": mock_package_module,
},
):
result = main(["--list-languages"])
translator._argos_available = original
assert result == 0
captured = capsys.readouterr()
assert "No languages installed" in captured.out
@ -478,10 +545,28 @@ class TestMain:
mock_lang.code = "en"
mock_lang.name = "English"
with ArgosAvailableMock() as mock:
mock.get_installed_languages.return_value = [mock_lang]
mock_translate_module = MagicMock()
mock_translate_module.get_installed_languages.return_value = [mock_lang]
mock_package_module = MagicMock()
mock_parent = MagicMock()
mock_parent.translate = mock_translate_module
mock_parent.package = mock_package_module
original = translator._argos_available
translator._argos_available = True
with patch.dict(
"sys.modules",
{
"argostranslate": mock_parent,
"argostranslate.translate": mock_translate_module,
"argostranslate.package": mock_package_module,
},
):
result = main(["--list-languages"])
translator._argos_available = original
assert result == 0
captured = capsys.readouterr()
assert "en" in captured.out
@ -578,10 +663,13 @@ class TestMain:
assert result == 1
def test_translation_failure_returns_error(
self, mock_all_translators_unavailable: None
) -> None:
"""Test that translation failure returns error code when no backends."""
def test_translation_failure_returns_error(self) -> None:
"""Test that translation failure returns error code when argos unavailable."""
with patch.object(
translator,
"_ensure_argos_installed",
side_effect=ImportError("argostranslate not available"),
):
result = main(["--text", "hello", "--from", "en", "--to", "es"])
assert result == 1
@ -594,9 +682,10 @@ class TestIntegration:
def test_full_translation_flow(self) -> None:
"""Test complete translation flow."""
with ArgosAvailableMock(["uno", "dos", "tres"]):
with ArgosAvailableMock(["uno", "dos", "tres"]) as mock:
mock.side_effect = ["uno", "dos", "tres"]
words = ["one", "two", "three"]
results = translate_words(words, "en", "es")
results = translate_words(words, "en", "es", use_cache=False)
assert all(r.success for r in results)
assert [r.translated_word for r in results] == ["uno", "dos", "tres"]
@ -606,14 +695,19 @@ class TestIntegration:
assert "one" in output
assert "uno" in output
def test_mixed_success_failure(
self, mock_all_translators_unavailable: None
) -> None:
"""Test handling when no translation backends are available."""
results = translate_words(["hello", "xyz", "world"], "en", "es")
def test_mixed_success_failure(self) -> None:
"""Test handling when argos raises exception for some translations."""
# Simulate argos translating first word, then failing, then succeeding
with ArgosAvailableMock() as mock:
mock.side_effect = ["hola", RuntimeError("Unknown"), "mundo"]
results = translate_words(
["hello", "xyz", "world"], "en", "es", use_cache=False
)
# All should fail when no backends available
assert all(not r.success for r in results)
# First and third succeed, second fails
assert results[0].success is True
assert results[1].success is False
assert results[2].success is True
output = format_translations(results)
assert "Error" in output

View File

@ -40,6 +40,65 @@ if TYPE_CHECKING:
_argos_available: bool | None = None
_deep_translator_available: bool | None = None
_langdetect_available: bool | None = None
_gpu_initialized: bool = False
_gpu_available: bool | None = None
def _check_cuda_available() -> bool:
"""Check if CUDA is available for GPU acceleration."""
global _gpu_available
if _gpu_available is None:
try:
import torch
_gpu_available = torch.cuda.is_available()
except ImportError:
_gpu_available = False
return _gpu_available
def _init_gpu_if_available() -> None:
"""Initialize GPU for argostranslate if CUDA is available.
Raises:
RuntimeError: If CUDA is available but GPU initialization fails.
"""
global _gpu_initialized
if _gpu_initialized:
return
if not _check_cuda_available():
_gpu_initialized = True
return
import sys
print("CUDA detected, initializing GPU acceleration...", file=sys.stderr)
try:
import torch
import ctranslate2
# Force CTranslate2 to use CUDA
device_count = torch.cuda.device_count()
if device_count == 0:
raise RuntimeError("CUDA reports available but no GPU devices found")
device_name = torch.cuda.get_device_name(0)
print(f" Using GPU: {device_name}", file=sys.stderr)
# Set environment variable to force GPU usage in argos
import os
os.environ["CT2_CUDA_ALLOW_FP16"] = "1"
os.environ["CT2_USE_EXPERIMENTAL_PACKED_GEMM"] = "1"
_gpu_initialized = True
print(" GPU acceleration enabled.", file=sys.stderr)
except Exception as e:
raise RuntimeError(
f"CUDA is available but GPU initialization failed: {e}\n"
f"This may be due to incompatible CUDA version or driver issues.\n"
f"To disable GPU and use CPU only, set environment variable: CT2_FORCE_CPU=1"
) from e
def _check_argos() -> bool:
@ -205,53 +264,163 @@ def download_languages(lang_codes: Sequence[str]) -> dict[str, bool]:
return results
def _ensure_argos_installed() -> None:
"""Ensure argostranslate is installed, attempt installation if not.
Raises:
ImportError: If argos cannot be installed.
"""
if _check_argos():
return
import subprocess
import sys
print("argostranslate not found. Attempting to install...") # noqa: T201
try:
subprocess.run(
[sys.executable, "-m", "pip", "install", "argostranslate"],
check=True,
capture_output=True,
)
# Reset the check flag and verify
global _argos_available # noqa: PLW0603
_argos_available = None
if not _check_argos():
raise ImportError("argostranslate installation succeeded but import failed")
print("argostranslate installed successfully.") # noqa: T201
except subprocess.CalledProcessError as e:
error_msg = e.stderr.decode() if e.stderr else str(e)
raise ImportError(
f"argostranslate is required for offline translation.\n\n"
f"Install manually with one of:\n"
f" pip install argostranslate # In a virtualenv\n"
f" pipx install argostranslate # System-wide via pipx\n"
f" pacman -S python-argostranslate # Arch Linux (if available)\n\n"
f"Original error: {error_msg}"
) from e
def _ensure_language_pair(from_lang: str, to_lang: str) -> None:
"""Ensure the language pair is available, download if needed.
Args:
from_lang: Source language code.
to_lang: Target language code.
Raises:
ValueError: If language pair cannot be obtained.
"""
import argostranslate.package
import argostranslate.translate
# Check if already installed
installed_languages = argostranslate.translate.get_installed_languages()
from_lang_obj = None
to_lang_obj = None
for lang in installed_languages:
if lang.code == from_lang:
from_lang_obj = lang
if lang.code == to_lang:
to_lang_obj = lang
if from_lang_obj and to_lang_obj:
# Check if translation is available
translation = from_lang_obj.get_translation(to_lang_obj)
if translation:
return # Already available
# Need to download
import sys
print(
f"Downloading language pack: {from_lang} -> {to_lang}...",
file=sys.stderr,
)
print(" Fetching package index...", file=sys.stderr)
argostranslate.package.update_package_index()
available = argostranslate.package.get_available_packages()
pkg = next(
(p for p in available if p.from_code == from_lang and p.to_code == to_lang),
None,
)
if pkg is None:
raise ValueError(
f"No language pack available for {from_lang} -> {to_lang}. "
f"Available pairs can be listed with --list-languages."
)
print(
f" Downloading package (~50-100MB, this may take a minute)...",
file=sys.stderr,
)
download_path = pkg.download()
print(" Installing language pack...", file=sys.stderr)
argostranslate.package.install_from_path(download_path)
print(
f"Language pack {from_lang} -> {to_lang} installed.",
file=sys.stderr,
)
def translate_word(
word: str,
from_lang: str,
to_lang: str,
*,
use_cache: bool = True,
) -> TranslationResult:
"""Translate a single word.
Uses argostranslate if available (offline), otherwise falls back to
deep-translator (Google Translate, online).
"""Translate a single word using argostranslate (offline).
Args:
word: The word to translate.
from_lang: Source language code (e.g., 'en', 'pl', 'la').
to_lang: Target language code.
use_cache: Whether to use/update translation cache.
Returns:
TranslationResult with the translation.
"""
# Try argostranslate first (offline)
if _check_argos():
import argostranslate.translate
Raises:
ImportError: If argostranslate is not available and cannot be installed.
"""
# Check cache first
if use_cache:
try:
translated = argostranslate.translate.translate(word, from_lang, to_lang)
from python_pkg.word_frequency.cache import get_translation_cache
cache = get_translation_cache()
cached = cache.get(word, from_lang, to_lang)
if cached is not None:
return TranslationResult(
source_word=word,
translated_word=translated,
translated_word=cached,
source_lang=from_lang,
target_lang=to_lang,
success=True,
)
except Exception as e: # noqa: BLE001
# Fall through to try deep-translator
argos_error = str(e)
else:
argos_error = None
except ImportError:
pass # Cache not available
# Try deep-translator (online via Google Translate)
if _check_deep_translator():
from deep_translator import GoogleTranslator
# Ensure argos is installed (will raise if it can't be)
_ensure_argos_installed()
import argostranslate.translate
try:
translator = GoogleTranslator(source=from_lang, target=to_lang)
translated = translator.translate(word)
translated = argostranslate.translate.translate(word, from_lang, to_lang)
# Cache the result
if use_cache:
try:
from python_pkg.word_frequency.cache import get_translation_cache
get_translation_cache().set(word, from_lang, to_lang, translated)
except ImportError:
pass
return TranslationResult(
source_word=word,
translated_word=translated or "",
translated_word=translated,
source_lang=from_lang,
target_lang=to_lang,
success=True,
@ -266,24 +435,13 @@ def translate_word(
error=str(e),
)
# Neither backend available
error_msg = "No translation backend available. Install: pip install deep-translator"
if argos_error:
error_msg = f"argostranslate error: {argos_error}"
return TranslationResult(
source_word=word,
translated_word="",
source_lang=from_lang,
target_lang=to_lang,
success=False,
error=error_msg,
)
def translate_words(
words: Sequence[str],
from_lang: str,
to_lang: str,
*,
use_cache: bool = True,
) -> list[TranslationResult]:
"""Translate multiple words.
@ -291,69 +449,187 @@ def translate_words(
words: List of words to translate.
from_lang: Source language code.
to_lang: Target language code.
use_cache: Whether to use translation cache.
Returns:
List of TranslationResult for each word.
"""
return [translate_word(word, from_lang, to_lang) for word in words]
return [translate_word(word, from_lang, to_lang, use_cache=use_cache) for word in words]
def translate_words_batch(
words: Sequence[str],
def _translate_batch_worker(
batch_words: list[str],
from_lang: str,
to_lang: str,
) -> list[TranslationResult]:
"""Translate multiple words, attempting batch translation for efficiency.
For better results with context, this joins words and translates together,
then splits. Falls back to word-by-word if batch fails.
batch_idx: int,
) -> tuple[int, dict[str, str]]:
"""Worker function to translate a batch of words.
Args:
words: List of words to translate.
batch_words: Words to translate in this batch.
from_lang: Source language code.
to_lang: Target language code.
batch_idx: Index of this batch (for ordering results).
Returns:
List of TranslationResult for each word.
Tuple of (batch_idx, translations dict).
"""
if not words:
return []
# For single words or small batches, just translate individually
if len(words) <= 3:
return translate_words(words, from_lang, to_lang)
# Try batch translation by joining with newlines
if not _check_argos():
return translate_words(words, from_lang, to_lang)
import argostranslate.translate
try:
# Join words with newlines for batch translation
batch_text = "\n".join(words)
translations: dict[str, str] = {}
# Batch translate by joining with newlines
batch_text = "\n".join(batch_words)
translated_batch = argostranslate.translate.translate(
batch_text, from_lang, to_lang
)
translated_words = translated_batch.split("\n")
# If we got the same number of translations, use them
if len(translated_words) == len(words):
return [
TranslationResult(
source_word=word,
translated_word=trans.strip(),
source_lang=from_lang,
target_lang=to_lang,
success=True,
if len(translated_words) == len(batch_words):
for word, trans in zip(batch_words, translated_words, strict=True):
translations[word.lower()] = trans.strip()
else:
# Fall back to individual translation for this batch
for word in batch_words:
translated = argostranslate.translate.translate(
word, from_lang, to_lang
)
for word, trans in zip(words, translated_words, strict=True)
]
except Exception: # noqa: BLE001, S110
translations[word.lower()] = translated
return batch_idx, translations
def translate_words_batch(
words: Sequence[str],
from_lang: str,
to_lang: str,
*,
use_cache: bool = True,
) -> list[TranslationResult]:
"""Translate multiple words using argostranslate (offline).
Uses small batch translation for efficiency with frequent progress updates.
Requires argostranslate. Will use GPU if CUDA is available.
Args:
words: List of words to translate.
from_lang: Source language code.
to_lang: Target language code.
use_cache: Whether to use translation cache.
Returns:
List of TranslationResult for each word.
Raises:
ImportError: If argostranslate is not available and cannot be installed.
RuntimeError: If CUDA is available but GPU initialization fails.
"""
if not words:
return []
# Ensure argos is installed (will raise if it can't be)
_ensure_argos_installed()
# Initialize GPU if available (will raise if CUDA available but fails)
_init_gpu_if_available()
# Ensure language pair is available
_ensure_language_pair(from_lang, to_lang)
# Check cache for already-translated words
cached_results: dict[str, str] = {}
words_to_translate: list[str] = []
if use_cache:
try:
from python_pkg.word_frequency.cache import get_translation_cache
cache = get_translation_cache()
cached_results = cache.get_many(list(words), from_lang, to_lang)
except ImportError:
pass
# Fall back to individual translation
return translate_words(words, from_lang, to_lang)
# Find words that still need translation
for word in words:
if word.lower() not in cached_results:
words_to_translate.append(word)
# Translate uncached words using argos batch
new_translations: dict[str, str] = {}
if words_to_translate:
import sys
num_to_translate = len(words_to_translate)
# Check if GPU is being used
gpu_status = " (GPU)" if _gpu_available else " (CPU)"
print(
f"Translating {num_to_translate} words from {from_lang} to {to_lang}{gpu_status}...",
file=sys.stderr,
flush=True,
)
try:
# Split into batches - larger batches are faster but show progress less often
BATCH_SIZE = 100
batches: list[list[str]] = []
for i in range(0, num_to_translate, BATCH_SIZE):
batches.append(words_to_translate[i:i + BATCH_SIZE])
total_batches = len(batches)
# Sequential translation with progress
# (argostranslate is not thread-safe - uses global model)
for batch_idx, batch_words in enumerate(batches):
words_done = (batch_idx + 1) * BATCH_SIZE
words_done = min(words_done, num_to_translate)
pct = int(words_done / num_to_translate * 100)
print(
f" [{pct:3d}%] Translating batch {batch_idx + 1}/{total_batches} "
f"({words_done}/{num_to_translate} words)...",
file=sys.stderr,
flush=True,
)
_, batch_translations = _translate_batch_worker(
batch_words, from_lang, to_lang, batch_idx
)
new_translations.update(batch_translations)
print(f" Translation complete.", file=sys.stderr, flush=True)
except Exception as e: # noqa: BLE001
raise RuntimeError(
f"Translation failed for {from_lang} -> {to_lang}: {e}"
) from e
# Cache new translations
if use_cache and new_translations:
try:
from python_pkg.word_frequency.cache import get_translation_cache
get_translation_cache().set_many(new_translations, from_lang, to_lang)
except ImportError:
pass
# Merge cached and new translations
all_translations = {**cached_results, **new_translations}
# Build results in original order
results: list[TranslationResult] = []
for word in words:
translation = all_translations.get(word.lower(), "")
results.append(
TranslationResult(
source_word=word,
translated_word=translation,
source_lang=from_lang,
target_lang=to_lang,
success=bool(translation),
error=None if translation else "Translation failed",
)
)
return results
def format_translations(
@ -551,7 +827,12 @@ def main(argv: Sequence[str] | None = None) -> int:
return 1
# Translate
try:
results = translate_words_batch(words, args.from_lang, args.to_lang)
except ImportError as e:
print(f"Error: {e}", file=sys.stderr) # noqa: T201
return 1
output = format_translations(results)
# Output