mirror of
https://github.com/kuhyx/testsAndMisc.git
synced 2026-07-04 13:23:15 +02:00
feat: vocabulary curbe in C
This commit is contained in:
parent
c7e89c7951
commit
272b8c56d0
13
C/vocabulary_curve/Makefile
Normal file
13
C/vocabulary_curve/Makefile
Normal file
@ -0,0 +1,13 @@
|
||||
CC = gcc
|
||||
CFLAGS = -O3 -Wall -Wextra -march=native
|
||||
TARGET = vocabulary_curve
|
||||
|
||||
all: $(TARGET)
|
||||
|
||||
$(TARGET): main.c
|
||||
$(CC) $(CFLAGS) -o $(TARGET) main.c
|
||||
|
||||
clean:
|
||||
rm -f $(TARGET)
|
||||
|
||||
.PHONY: all clean
|
||||
359
C/vocabulary_curve/main.c
Normal file
359
C/vocabulary_curve/main.c
Normal file
@ -0,0 +1,359 @@
|
||||
/*
|
||||
* Vocabulary Learning Curve Analyzer
|
||||
*
|
||||
* For each excerpt length (1, 2, 3, ... N words), finds the excerpt that
|
||||
* requires the minimum number of top-frequency words to understand 100%.
|
||||
*
|
||||
* Usage:
|
||||
* ./vocabulary_curve <file.txt> [max_length]
|
||||
* ./vocabulary_curve test.txt 50
|
||||
*/
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <ctype.h>
|
||||
#include <stdbool.h>
|
||||
|
||||
#define MAX_WORD_LEN 64
|
||||
#define MAX_WORDS 500000
|
||||
#define MAX_UNIQUE_WORDS 100000
|
||||
#define HASH_SIZE 200003 /* Prime number for better distribution */
|
||||
|
||||
/* Word entry for hash table */
|
||||
typedef struct WordEntry {
|
||||
char word[MAX_WORD_LEN];
|
||||
int count;
|
||||
int rank; /* 1-indexed rank by frequency (1 = most common) */
|
||||
struct WordEntry *next;
|
||||
} WordEntry;
|
||||
|
||||
/* Hash table for word lookup */
|
||||
static WordEntry *hash_table[HASH_SIZE];
|
||||
static WordEntry *all_entries[MAX_UNIQUE_WORDS];
|
||||
static int num_unique_words = 0;
|
||||
|
||||
/* All words in order of appearance - store POINTERS not indices */
|
||||
static WordEntry *word_sequence[MAX_WORDS];
|
||||
static int num_words = 0;
|
||||
|
||||
/* Result for each excerpt length */
|
||||
typedef struct {
|
||||
int excerpt_length;
|
||||
int min_vocab_needed;
|
||||
int start_pos; /* Start position in word_sequence */
|
||||
} ExcerptResult;
|
||||
|
||||
/* Simple hash function */
|
||||
static unsigned int hash_word(const char *word) {
|
||||
unsigned int hash = 5381;
|
||||
int c;
|
||||
while ((c = *word++)) {
|
||||
hash = ((hash << 5) + hash) + c;
|
||||
}
|
||||
return hash % HASH_SIZE;
|
||||
}
|
||||
|
||||
/* Find or create word entry */
|
||||
static WordEntry *get_or_create_word(const char *word) {
|
||||
unsigned int h = hash_word(word);
|
||||
WordEntry *entry = hash_table[h];
|
||||
|
||||
while (entry) {
|
||||
if (strcmp(entry->word, word) == 0) {
|
||||
return entry;
|
||||
}
|
||||
entry = entry->next;
|
||||
}
|
||||
|
||||
/* Create new entry */
|
||||
if (num_unique_words >= MAX_UNIQUE_WORDS) {
|
||||
fprintf(stderr, "Too many unique words\n");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
entry = malloc(sizeof(WordEntry));
|
||||
if (!entry) {
|
||||
fprintf(stderr, "Memory allocation failed\n");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
strncpy(entry->word, word, MAX_WORD_LEN - 1);
|
||||
entry->word[MAX_WORD_LEN - 1] = '\0';
|
||||
entry->count = 0;
|
||||
entry->rank = 0;
|
||||
entry->next = hash_table[h];
|
||||
hash_table[h] = entry;
|
||||
|
||||
all_entries[num_unique_words++] = entry;
|
||||
|
||||
return entry;
|
||||
}
|
||||
|
||||
/* Compare function for sorting by frequency (descending) */
|
||||
static int compare_by_count(const void *a, const void *b) {
|
||||
const WordEntry *wa = *(const WordEntry **)a;
|
||||
const WordEntry *wb = *(const WordEntry **)b;
|
||||
return wb->count - wa->count; /* Descending */
|
||||
}
|
||||
|
||||
/* Check if character is part of a word */
|
||||
static bool is_word_char(int c) {
|
||||
return isalnum(c) || c == '_' || (unsigned char)c >= 128;
|
||||
}
|
||||
|
||||
/* Read and process file */
|
||||
static bool process_file(const char *filename) {
|
||||
FILE *fp = fopen(filename, "r");
|
||||
if (!fp) {
|
||||
fprintf(stderr, "Cannot open file: %s\n", filename);
|
||||
return false;
|
||||
}
|
||||
|
||||
char word[MAX_WORD_LEN];
|
||||
int word_len = 0;
|
||||
int c;
|
||||
|
||||
while ((c = fgetc(fp)) != EOF) {
|
||||
if (is_word_char(c)) {
|
||||
if (word_len < MAX_WORD_LEN - 1) {
|
||||
word[word_len++] = tolower(c);
|
||||
}
|
||||
} else if (word_len > 0) {
|
||||
word[word_len] = '\0';
|
||||
|
||||
WordEntry *entry = get_or_create_word(word);
|
||||
entry->count++;
|
||||
|
||||
if (num_words >= MAX_WORDS) {
|
||||
fprintf(stderr, "Too many words in file\n");
|
||||
fclose(fp);
|
||||
return false;
|
||||
}
|
||||
|
||||
/* Store pointer directly - survives sorting */
|
||||
word_sequence[num_words++] = entry;
|
||||
|
||||
word_len = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* Handle last word if file doesn't end with whitespace */
|
||||
if (word_len > 0) {
|
||||
word[word_len] = '\0';
|
||||
WordEntry *entry = get_or_create_word(word);
|
||||
entry->count++;
|
||||
|
||||
if (num_words < MAX_WORDS) {
|
||||
word_sequence[num_words++] = entry;
|
||||
}
|
||||
}
|
||||
|
||||
fclose(fp);
|
||||
return true;
|
||||
}
|
||||
|
||||
/* Assign ranks based on frequency */
|
||||
static void assign_ranks(void) {
|
||||
/* Sort all_entries by frequency (this doesn't affect word_sequence) */
|
||||
qsort(all_entries, num_unique_words, sizeof(WordEntry *), compare_by_count);
|
||||
|
||||
/* Assign 1-indexed ranks */
|
||||
for (int i = 0; i < num_unique_words; i++) {
|
||||
all_entries[i]->rank = i + 1;
|
||||
}
|
||||
}
|
||||
|
||||
/* Analyze excerpt and return max rank needed */
|
||||
static int analyze_excerpt(int start, int length) {
|
||||
/* Track which entries we've seen using a simple visited array */
|
||||
/* We use the rank field is already assigned, so we can check uniqueness */
|
||||
static bool seen_rank[MAX_UNIQUE_WORDS + 1];
|
||||
memset(seen_rank, 0, (num_unique_words + 1) * sizeof(bool));
|
||||
|
||||
int max_rank = 0;
|
||||
|
||||
for (int i = start; i < start + length; i++) {
|
||||
WordEntry *entry = word_sequence[i];
|
||||
int rank = entry->rank;
|
||||
|
||||
if (!seen_rank[rank]) {
|
||||
seen_rank[rank] = true;
|
||||
if (rank > max_rank) {
|
||||
max_rank = rank;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return max_rank;
|
||||
}
|
||||
|
||||
/* Find optimal excerpts for each length */
|
||||
static void find_optimal_excerpts(int max_length, ExcerptResult *results) {
|
||||
for (int length = 1; length <= max_length && length <= num_words; length++) {
|
||||
int best_vocab = num_unique_words + 1;
|
||||
int best_start = 0;
|
||||
|
||||
/* Slide window through text */
|
||||
for (int start = 0; start <= num_words - length; start++) {
|
||||
int vocab_needed = analyze_excerpt(start, length);
|
||||
|
||||
if (vocab_needed < best_vocab) {
|
||||
best_vocab = vocab_needed;
|
||||
best_start = start;
|
||||
}
|
||||
}
|
||||
|
||||
results[length - 1].excerpt_length = length;
|
||||
results[length - 1].min_vocab_needed = best_vocab;
|
||||
results[length - 1].start_pos = best_start;
|
||||
}
|
||||
}
|
||||
|
||||
/* Print excerpt words */
|
||||
static void print_excerpt(int start, int length) {
|
||||
for (int i = start; i < start + length; i++) {
|
||||
if (i > start) printf(" ");
|
||||
printf("%s", word_sequence[i]->word);
|
||||
}
|
||||
}
|
||||
|
||||
/* Print words needed (sorted by rank) */
|
||||
static void print_words_needed(int start, int length) {
|
||||
/* Collect unique entries */
|
||||
static WordEntry *unique_entries[MAX_UNIQUE_WORDS];
|
||||
static bool seen_rank[MAX_UNIQUE_WORDS + 1];
|
||||
memset(seen_rank, 0, (num_unique_words + 1) * sizeof(bool));
|
||||
|
||||
int count = 0;
|
||||
for (int i = start; i < start + length; i++) {
|
||||
WordEntry *entry = word_sequence[i];
|
||||
if (!seen_rank[entry->rank]) {
|
||||
seen_rank[entry->rank] = true;
|
||||
unique_entries[count++] = entry;
|
||||
}
|
||||
}
|
||||
|
||||
/* Sort by rank (simple bubble sort - small arrays) */
|
||||
for (int i = 0; i < count - 1; i++) {
|
||||
for (int j = i + 1; j < count; j++) {
|
||||
if (unique_entries[i]->rank > unique_entries[j]->rank) {
|
||||
WordEntry *tmp = unique_entries[i];
|
||||
unique_entries[i] = unique_entries[j];
|
||||
unique_entries[j] = tmp;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Print */
|
||||
for (int i = 0; i < count; i++) {
|
||||
if (i > 0) printf(", ");
|
||||
printf("%s(#%d)", unique_entries[i]->word, unique_entries[i]->rank);
|
||||
}
|
||||
}
|
||||
|
||||
/* Print results */
|
||||
static void print_results(ExcerptResult *results, int max_length) {
|
||||
printf("======================================================================\n");
|
||||
printf("VOCABULARY LEARNING CURVE\n");
|
||||
printf("======================================================================\n");
|
||||
printf("\n");
|
||||
printf("For each excerpt length, the minimum number of top-frequency\n");
|
||||
printf("words you need to learn to understand 100%% of some excerpt.\n");
|
||||
printf("\n");
|
||||
printf("Total words in text: %d\n", num_words);
|
||||
printf("Unique words: %d\n", num_unique_words);
|
||||
printf("\n");
|
||||
printf("----------------------------------------------------------------------\n");
|
||||
|
||||
int prev_vocab = 0;
|
||||
int actual_max = max_length;
|
||||
if (actual_max > num_words) actual_max = num_words;
|
||||
|
||||
for (int i = 0; i < actual_max; i++) {
|
||||
ExcerptResult *r = &results[i];
|
||||
|
||||
printf("\n[Length %d] Vocab needed: %d", r->excerpt_length, r->min_vocab_needed);
|
||||
if (r->min_vocab_needed > prev_vocab) {
|
||||
printf(" (+%d)", r->min_vocab_needed - prev_vocab);
|
||||
}
|
||||
printf("\n");
|
||||
|
||||
printf(" Excerpt: \"");
|
||||
print_excerpt(r->start_pos, r->excerpt_length);
|
||||
printf("\"\n");
|
||||
|
||||
printf(" Words: ");
|
||||
print_words_needed(r->start_pos, r->excerpt_length);
|
||||
printf("\n");
|
||||
|
||||
prev_vocab = r->min_vocab_needed;
|
||||
}
|
||||
|
||||
printf("\n----------------------------------------------------------------------\n");
|
||||
|
||||
if (actual_max > 0) {
|
||||
ExcerptResult *final = &results[actual_max - 1];
|
||||
printf("\nTo understand a %d-word excerpt,\n", final->excerpt_length);
|
||||
printf("you need to learn at minimum %d top words.\n", final->min_vocab_needed);
|
||||
}
|
||||
}
|
||||
|
||||
/* Free memory */
|
||||
static void cleanup(void) {
|
||||
for (int i = 0; i < num_unique_words; i++) {
|
||||
free(all_entries[i]);
|
||||
}
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
if (argc < 2) {
|
||||
fprintf(stderr, "Usage: %s <file.txt> [max_length]\n", argv[0]);
|
||||
fprintf(stderr, " max_length: maximum excerpt length to analyze (default: 30)\n");
|
||||
return 1;
|
||||
}
|
||||
|
||||
const char *filename = argv[1];
|
||||
int max_length = 30;
|
||||
|
||||
if (argc >= 3) {
|
||||
max_length = atoi(argv[2]);
|
||||
if (max_length < 1) max_length = 1;
|
||||
if (max_length > 1000) max_length = 1000;
|
||||
}
|
||||
|
||||
/* Initialize hash table */
|
||||
memset(hash_table, 0, sizeof(hash_table));
|
||||
|
||||
/* Process file */
|
||||
if (!process_file(filename)) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (num_words == 0) {
|
||||
fprintf(stderr, "No words found in file\n");
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Assign ranks by frequency */
|
||||
assign_ranks();
|
||||
|
||||
/* Find optimal excerpts */
|
||||
ExcerptResult *results = malloc(max_length * sizeof(ExcerptResult));
|
||||
if (!results) {
|
||||
fprintf(stderr, "Memory allocation failed\n");
|
||||
cleanup();
|
||||
return 1;
|
||||
}
|
||||
|
||||
find_optimal_excerpts(max_length, results);
|
||||
|
||||
/* Print results */
|
||||
print_results(results, max_length);
|
||||
|
||||
/* Cleanup */
|
||||
free(results);
|
||||
cleanup();
|
||||
|
||||
return 0;
|
||||
}
|
||||
BIN
C/vocabulary_curve/vocabulary_curve
Executable file
BIN
C/vocabulary_curve/vocabulary_curve
Executable file
Binary file not shown.
244
python_pkg/word_frequency/tests/test_vocabulary_curve.py
Normal file
244
python_pkg/word_frequency/tests/test_vocabulary_curve.py
Normal file
@ -0,0 +1,244 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for vocabulary_curve C implementation."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import subprocess
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
# Path to the C executable
|
||||
C_EXECUTABLE = Path(__file__).parent.parent.parent.parent / "C" / "vocabulary_curve" / "vocabulary_curve"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_text_file(tmp_path: Path) -> Path:
|
||||
"""Create a sample text file for testing."""
|
||||
text = """The quick brown fox jumps over the lazy dog.
|
||||
The fox was very quick and the dog was very lazy.
|
||||
Quick foxes and lazy dogs are common in stories."""
|
||||
filepath = tmp_path / "sample.txt"
|
||||
filepath.write_text(text, encoding="utf-8")
|
||||
return filepath
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def polish_text_file(tmp_path: Path) -> Path:
|
||||
"""Create a Polish sample text file."""
|
||||
text = """Litwo! Ojczyzno moja! Ty jesteś jak zdrowie.
|
||||
Ile cię trzeba cenić, ten tylko się dowie,
|
||||
Kto cię stracił. Dziś piękność twą w całej ozdobie
|
||||
Widzę i opisuję, bo tęsknię po tobie."""
|
||||
filepath = tmp_path / "polish.txt"
|
||||
filepath.write_text(text, encoding="utf-8")
|
||||
return filepath
|
||||
|
||||
|
||||
def run_vocabulary_curve(filepath: Path, max_length: int = 10) -> str:
|
||||
"""Run the vocabulary_curve executable and return output."""
|
||||
if not C_EXECUTABLE.exists():
|
||||
pytest.skip(f"C executable not found at {C_EXECUTABLE}")
|
||||
|
||||
result = subprocess.run(
|
||||
[str(C_EXECUTABLE), str(filepath), str(max_length)],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
)
|
||||
return result.stdout
|
||||
|
||||
|
||||
def extract_excerpts_from_output(output: str) -> list[tuple[int, str]]:
|
||||
"""Extract (length, excerpt) pairs from output."""
|
||||
excerpts = []
|
||||
lines = output.split("\n")
|
||||
|
||||
i = 0
|
||||
while i < len(lines):
|
||||
line = lines[i]
|
||||
if line.strip().startswith("[Length "):
|
||||
# Parse length
|
||||
length = int(line.split("]")[0].split()[-1])
|
||||
|
||||
# Find excerpt line
|
||||
i += 1
|
||||
while i < len(lines) and not lines[i].strip().startswith("Excerpt:"):
|
||||
i += 1
|
||||
|
||||
if i < len(lines):
|
||||
excerpt_line = lines[i].strip()
|
||||
# Extract text between quotes
|
||||
if '"' in excerpt_line:
|
||||
start = excerpt_line.index('"') + 1
|
||||
end = excerpt_line.rindex('"')
|
||||
excerpt = excerpt_line[start:end]
|
||||
excerpts.append((length, excerpt))
|
||||
i += 1
|
||||
|
||||
return excerpts
|
||||
|
||||
|
||||
class TestExcerptValidity:
|
||||
"""Tests that verify excerpts are actually found in the source text."""
|
||||
|
||||
def test_excerpt_exists_in_source_text(self, sample_text_file: Path) -> None:
|
||||
"""Test that each excerpt can be found in the source text as contiguous words."""
|
||||
import re
|
||||
source_text = sample_text_file.read_text(encoding="utf-8").lower()
|
||||
source_words = re.findall(r'\b[\w]+\b', source_text)
|
||||
output = run_vocabulary_curve(sample_text_file, max_length=10)
|
||||
excerpts = extract_excerpts_from_output(output)
|
||||
|
||||
assert len(excerpts) > 0, "No excerpts found in output"
|
||||
|
||||
for length, excerpt in excerpts:
|
||||
excerpt_words = excerpt.lower().split()
|
||||
# Find this sequence in source_words
|
||||
found = False
|
||||
for i in range(len(source_words) - len(excerpt_words) + 1):
|
||||
if source_words[i:i+len(excerpt_words)] == excerpt_words:
|
||||
found = True
|
||||
break
|
||||
assert found, (
|
||||
f"Excerpt of length {length} not found in source text:\n"
|
||||
f" Excerpt words: {excerpt_words}\n"
|
||||
f" First 30 source words: {source_words[:30]}"
|
||||
)
|
||||
|
||||
def test_excerpt_word_count_matches_length(self, sample_text_file: Path) -> None:
|
||||
"""Test that excerpt has the expected number of words."""
|
||||
output = run_vocabulary_curve(sample_text_file, max_length=10)
|
||||
excerpts = extract_excerpts_from_output(output)
|
||||
|
||||
for length, excerpt in excerpts:
|
||||
word_count = len(excerpt.split())
|
||||
assert word_count == length, (
|
||||
f"Expected {length} words, got {word_count}: '{excerpt}'"
|
||||
)
|
||||
|
||||
def test_polish_excerpt_exists_in_source(self, polish_text_file: Path) -> None:
|
||||
"""Test Polish text excerpts are found in source as contiguous words."""
|
||||
import re
|
||||
source_text = polish_text_file.read_text(encoding="utf-8").lower()
|
||||
source_words = re.findall(r'\b[\w]+\b', source_text)
|
||||
output = run_vocabulary_curve(polish_text_file, max_length=8)
|
||||
excerpts = extract_excerpts_from_output(output)
|
||||
|
||||
assert len(excerpts) > 0, "No excerpts found in output"
|
||||
|
||||
for length, excerpt in excerpts:
|
||||
excerpt_words = excerpt.lower().split()
|
||||
# Find this sequence in source_words
|
||||
found = False
|
||||
for i in range(len(source_words) - len(excerpt_words) + 1):
|
||||
if source_words[i:i+len(excerpt_words)] == excerpt_words:
|
||||
found = True
|
||||
break
|
||||
assert found, (
|
||||
f"Polish excerpt of length {length} not found:\n"
|
||||
f" Excerpt words: {excerpt_words}\n"
|
||||
f" Source words: {source_words}"
|
||||
)
|
||||
|
||||
def test_excerpt_is_contiguous(self, sample_text_file: Path) -> None:
|
||||
"""Test that excerpt words appear contiguously in source."""
|
||||
import re
|
||||
|
||||
source_text = sample_text_file.read_text(encoding="utf-8").lower()
|
||||
# Extract words from source
|
||||
source_words = re.findall(r'\b[\w]+\b', source_text)
|
||||
|
||||
output = run_vocabulary_curve(sample_text_file, max_length=5)
|
||||
excerpts = extract_excerpts_from_output(output)
|
||||
|
||||
for length, excerpt in excerpts:
|
||||
excerpt_words = excerpt.lower().split()
|
||||
|
||||
# Find this sequence in source_words
|
||||
found = False
|
||||
for i in range(len(source_words) - length + 1):
|
||||
if source_words[i:i+length] == excerpt_words:
|
||||
found = True
|
||||
break
|
||||
|
||||
assert found, (
|
||||
f"Excerpt words not found as contiguous sequence:\n"
|
||||
f" Excerpt: {excerpt_words}\n"
|
||||
f" First 20 source words: {source_words[:20]}"
|
||||
)
|
||||
|
||||
|
||||
class TestVocabNeeded:
|
||||
"""Tests for vocabulary count calculations."""
|
||||
|
||||
def test_length_1_needs_vocab_1(self, sample_text_file: Path) -> None:
|
||||
"""Test that a 1-word excerpt needs exactly 1 vocabulary word."""
|
||||
output = run_vocabulary_curve(sample_text_file, max_length=1)
|
||||
|
||||
assert "[Length 1] Vocab needed: 1" in output
|
||||
|
||||
def test_vocab_needed_increases_monotonically(self, sample_text_file: Path) -> None:
|
||||
"""Test that vocab needed never decreases as length increases."""
|
||||
output = run_vocabulary_curve(sample_text_file, max_length=10)
|
||||
excerpts = extract_excerpts_from_output(output)
|
||||
|
||||
# Extract vocab needed from output
|
||||
prev_vocab = 0
|
||||
for line in output.split("\n"):
|
||||
if "Vocab needed:" in line:
|
||||
# Parse "Vocab needed: X"
|
||||
parts = line.split("Vocab needed:")
|
||||
if len(parts) > 1:
|
||||
vocab = int(parts[1].split()[0])
|
||||
assert vocab >= prev_vocab, (
|
||||
f"Vocab decreased from {prev_vocab} to {vocab}"
|
||||
)
|
||||
prev_vocab = vocab
|
||||
|
||||
|
||||
class TestEdgeCases:
|
||||
"""Edge case tests."""
|
||||
|
||||
def test_empty_file(self, tmp_path: Path) -> None:
|
||||
"""Test handling of empty file."""
|
||||
filepath = tmp_path / "empty.txt"
|
||||
filepath.write_text("", encoding="utf-8")
|
||||
|
||||
if not C_EXECUTABLE.exists():
|
||||
pytest.skip("C executable not found")
|
||||
|
||||
result = subprocess.run(
|
||||
[str(C_EXECUTABLE), str(filepath), "5"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
assert result.returncode != 0 or "No words" in result.stderr
|
||||
|
||||
def test_single_word_file(self, tmp_path: Path) -> None:
|
||||
"""Test file with single word."""
|
||||
filepath = tmp_path / "single.txt"
|
||||
filepath.write_text("hello", encoding="utf-8")
|
||||
|
||||
output = run_vocabulary_curve(filepath, max_length=5)
|
||||
|
||||
assert "[Length 1] Vocab needed: 1" in output
|
||||
# Should only have 1 length since there's only 1 word
|
||||
assert "[Length 2]" not in output
|
||||
|
||||
def test_repeated_word_file(self, tmp_path: Path) -> None:
|
||||
"""Test file with same word repeated."""
|
||||
filepath = tmp_path / "repeated.txt"
|
||||
filepath.write_text("hello hello hello hello hello", encoding="utf-8")
|
||||
|
||||
output = run_vocabulary_curve(filepath, max_length=5)
|
||||
|
||||
# All excerpts should need only 1 vocabulary word
|
||||
for i in range(1, 6):
|
||||
assert f"[Length {i}] Vocab needed: 1" in output
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
319
python_pkg/word_frequency/vocabulary_curve.py
Normal file
319
python_pkg/word_frequency/vocabulary_curve.py
Normal file
@ -0,0 +1,319 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Vocabulary learning curve analyzer.
|
||||
|
||||
Finds the minimum vocabulary needed to understand excerpts of increasing length.
|
||||
For each excerpt length (1, 2, 3, ... N words), finds the excerpt that requires
|
||||
the fewest top-frequency words to understand 100%.
|
||||
|
||||
Usage:
|
||||
python -m python_pkg.word_frequency.vocabulary_curve --file text.txt
|
||||
python -m python_pkg.word_frequency.vocabulary_curve --file text.txt --max-length 50
|
||||
python -m python_pkg.word_frequency.vocabulary_curve --text "some text here"
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, NamedTuple
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Sequence
|
||||
|
||||
try:
|
||||
from python_pkg.word_frequency.analyzer import analyze_text, read_file
|
||||
except ImportError:
|
||||
from analyzer import analyze_text, read_file
|
||||
|
||||
|
||||
class ExcerptAnalysis(NamedTuple):
|
||||
"""Analysis result for an excerpt length."""
|
||||
|
||||
excerpt_length: int
|
||||
min_vocab_needed: int
|
||||
best_excerpt: str
|
||||
words_needed: list[str]
|
||||
|
||||
|
||||
def get_word_rank(word: str, ranked_words: list[str]) -> int | None:
|
||||
"""Get the rank (1-indexed) of a word in the frequency list.
|
||||
|
||||
Args:
|
||||
word: The word to look up.
|
||||
ranked_words: List of words sorted by frequency (most common first).
|
||||
|
||||
Returns:
|
||||
1-indexed rank, or None if word not in list.
|
||||
"""
|
||||
try:
|
||||
return ranked_words.index(word) + 1
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def analyze_excerpt(
|
||||
excerpt_words: list[str],
|
||||
ranked_words: list[str],
|
||||
) -> tuple[int, list[str]]:
|
||||
"""Analyze how many top words are needed to understand an excerpt 100%.
|
||||
|
||||
Args:
|
||||
excerpt_words: List of words in the excerpt.
|
||||
ranked_words: List of all words sorted by frequency (most common first).
|
||||
|
||||
Returns:
|
||||
Tuple of (max_rank_needed, list_of_words_needed_sorted_by_rank).
|
||||
"""
|
||||
unique_words = set(excerpt_words)
|
||||
ranks: list[tuple[int, str]] = []
|
||||
|
||||
for word in unique_words:
|
||||
rank = get_word_rank(word, ranked_words)
|
||||
if rank is not None:
|
||||
ranks.append((rank, word))
|
||||
else:
|
||||
# Word not in vocabulary - would need infinite learning
|
||||
return float("inf"), [] # type: ignore[return-value]
|
||||
|
||||
if not ranks:
|
||||
return 0, []
|
||||
|
||||
# Sort by rank
|
||||
ranks.sort()
|
||||
max_rank = ranks[-1][0]
|
||||
words_needed = [word for _, word in ranks]
|
||||
|
||||
return max_rank, words_needed
|
||||
|
||||
|
||||
def find_optimal_excerpts(
|
||||
text: str,
|
||||
*,
|
||||
max_length: int = 30,
|
||||
case_sensitive: bool = False,
|
||||
) -> list[ExcerptAnalysis]:
|
||||
"""Find optimal excerpts for each length.
|
||||
|
||||
For each excerpt length from 1 to max_length, finds the excerpt
|
||||
that requires the minimum number of top-frequency words to understand.
|
||||
|
||||
Args:
|
||||
text: The source text to analyze.
|
||||
max_length: Maximum excerpt length to analyze.
|
||||
case_sensitive: Whether to treat words case-sensitively.
|
||||
|
||||
Returns:
|
||||
List of ExcerptAnalysis for each length from 1 to max_length.
|
||||
"""
|
||||
# Get word frequencies and create ranked list
|
||||
word_counts = analyze_text(text, case_sensitive=case_sensitive)
|
||||
ranked_words = [word for word, _ in word_counts.most_common()]
|
||||
|
||||
# Extract all words from text (preserving order)
|
||||
import re
|
||||
all_words = re.findall(r"\b[\w]+\b", text, re.UNICODE)
|
||||
if not case_sensitive:
|
||||
all_words = [w.lower() for w in all_words]
|
||||
|
||||
if not all_words:
|
||||
return []
|
||||
|
||||
results: list[ExcerptAnalysis] = []
|
||||
|
||||
for length in range(1, min(max_length + 1, len(all_words) + 1)):
|
||||
best_vocab_needed = float("inf")
|
||||
best_excerpt_words: list[str] = []
|
||||
best_words_needed: list[str] = []
|
||||
|
||||
# Slide window through text
|
||||
for start in range(len(all_words) - length + 1):
|
||||
excerpt_words = all_words[start : start + length]
|
||||
vocab_needed, words_needed = analyze_excerpt(excerpt_words, ranked_words)
|
||||
|
||||
if vocab_needed < best_vocab_needed:
|
||||
best_vocab_needed = vocab_needed
|
||||
best_excerpt_words = excerpt_words
|
||||
best_words_needed = words_needed
|
||||
|
||||
if best_vocab_needed != float("inf"):
|
||||
results.append(
|
||||
ExcerptAnalysis(
|
||||
excerpt_length=length,
|
||||
min_vocab_needed=int(best_vocab_needed),
|
||||
best_excerpt=" ".join(best_excerpt_words),
|
||||
words_needed=best_words_needed,
|
||||
)
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def format_results(
|
||||
results: list[ExcerptAnalysis],
|
||||
*,
|
||||
show_excerpts: bool = False,
|
||||
show_words: bool = False,
|
||||
) -> str:
|
||||
"""Format analysis results as a table.
|
||||
|
||||
Args:
|
||||
results: List of ExcerptAnalysis results.
|
||||
show_excerpts: If True, show the actual excerpt text.
|
||||
show_words: If True, show which words are needed.
|
||||
|
||||
Returns:
|
||||
Formatted string with results.
|
||||
"""
|
||||
if not results:
|
||||
return "No excerpts found."
|
||||
|
||||
lines: list[str] = []
|
||||
lines.append("=" * 70)
|
||||
lines.append("VOCABULARY LEARNING CURVE")
|
||||
lines.append("=" * 70)
|
||||
lines.append("")
|
||||
lines.append("For each excerpt length, the minimum number of top-frequency")
|
||||
lines.append("words you need to learn to understand 100% of some excerpt.")
|
||||
lines.append("")
|
||||
lines.append("-" * 70)
|
||||
|
||||
# Header
|
||||
if show_excerpts:
|
||||
lines.append(f"{'Length':>6} {'Vocab':>5} Excerpt")
|
||||
lines.append(f"{'------':>6} {'-----':>5} {'-------'}")
|
||||
else:
|
||||
lines.append(f"{'Length':>6} {'Vocab Needed':>12}")
|
||||
lines.append(f"{'------':>6} {'------------':>12}")
|
||||
|
||||
prev_vocab = 0
|
||||
for r in results:
|
||||
# Mark increases
|
||||
marker = ""
|
||||
if r.min_vocab_needed > prev_vocab:
|
||||
marker = f" (+{r.min_vocab_needed - prev_vocab})"
|
||||
prev_vocab = r.min_vocab_needed
|
||||
|
||||
if show_excerpts:
|
||||
# Truncate long excerpts
|
||||
excerpt = r.best_excerpt
|
||||
if len(excerpt) > 50:
|
||||
excerpt = excerpt[:47] + "..."
|
||||
lines.append(f"{r.excerpt_length:>6} {r.min_vocab_needed:>5} {excerpt}")
|
||||
else:
|
||||
lines.append(f"{r.excerpt_length:>6} {r.min_vocab_needed:>12}{marker}")
|
||||
|
||||
if show_words and r.words_needed:
|
||||
lines.append(f" Words: {', '.join(r.words_needed)}")
|
||||
|
||||
lines.append("-" * 70)
|
||||
lines.append("")
|
||||
|
||||
# Summary statistics
|
||||
if results:
|
||||
final = results[-1]
|
||||
lines.append(f"To understand a {final.excerpt_length}-word excerpt,")
|
||||
lines.append(f"you need to learn at minimum {final.min_vocab_needed} top words.")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main(argv: Sequence[str] | None = None) -> int:
|
||||
"""Main entry point.
|
||||
|
||||
Args:
|
||||
argv: Command line arguments.
|
||||
|
||||
Returns:
|
||||
Exit code.
|
||||
"""
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Analyze minimum vocabulary needed for excerpt lengths.",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog=__doc__,
|
||||
)
|
||||
|
||||
input_group = parser.add_mutually_exclusive_group(required=True)
|
||||
input_group.add_argument(
|
||||
"--text",
|
||||
"-t",
|
||||
type=str,
|
||||
help="Raw text to analyze",
|
||||
)
|
||||
input_group.add_argument(
|
||||
"--file",
|
||||
"-f",
|
||||
type=str,
|
||||
help="Path to a file to analyze",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--max-length",
|
||||
"-m",
|
||||
type=int,
|
||||
default=30,
|
||||
help="Maximum excerpt length to analyze (default: 30)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--show-excerpts",
|
||||
"-e",
|
||||
action="store_true",
|
||||
help="Show the actual excerpt text for each length",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--show-words",
|
||||
"-w",
|
||||
action="store_true",
|
||||
help="Show which words are needed for each excerpt",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--case-sensitive",
|
||||
"-c",
|
||||
action="store_true",
|
||||
help="Treat words case-sensitively",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output",
|
||||
"-o",
|
||||
type=str,
|
||||
help="Output file path (default: print to stdout)",
|
||||
)
|
||||
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
try:
|
||||
if args.text:
|
||||
text = args.text
|
||||
else:
|
||||
text = read_file(args.file)
|
||||
|
||||
results = find_optimal_excerpts(
|
||||
text,
|
||||
max_length=args.max_length,
|
||||
case_sensitive=args.case_sensitive,
|
||||
)
|
||||
|
||||
output = format_results(
|
||||
results,
|
||||
show_excerpts=args.show_excerpts,
|
||||
show_words=args.show_words,
|
||||
)
|
||||
|
||||
if args.output:
|
||||
Path(args.output).write_text(output, encoding="utf-8")
|
||||
print(f"Output written to {args.output}") # noqa: T201
|
||||
else:
|
||||
print(output) # noqa: T201
|
||||
|
||||
except FileNotFoundError as e:
|
||||
print(f"Error: File not found - {e}", file=sys.stderr) # noqa: T201
|
||||
return 1
|
||||
except UnicodeDecodeError as e:
|
||||
print(f"Error: Could not decode file - {e}", file=sys.stderr) # noqa: T201
|
||||
return 1
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Loading…
Reference in New Issue
Block a user