feat: improvements in hosts bluetooth focus mode and backglog scripts

This commit is contained in:
Krzysztof kuhy Rudnicki 2026-03-11 20:47:03 +01:00
parent 497dcfd8b3
commit ee68d47a4c
7 changed files with 194 additions and 78 deletions

View File

@ -311,6 +311,11 @@ sudo sed -i 's/^0\.0\.0\.0 www\.4chan\.org/#0.0.0.0 www.4chan.org/' /etc/hosts
sudo sed -i 's/^0\.0\.0\.0 www\.facebook\.com/#0.0.0.0 www.facebook.com/' /etc/hosts
sudo sed -i 's/^0\.0\.0\.0 messenger\.com/#0.0.0.0 messenger.com/' /etc/hosts
# Allow LinkedIn and all subdomains (linkedin.com + licdn.com CDN)
echo "Allowing LinkedIn by commenting out any blocking entries..."
sudo sed -i -E 's/^(0\.0\.0\.0[[:space:]]+[a-zA-Z0-9._-]*\.?linkedin\.com)/#\1/' /etc/hosts
sudo sed -i -E 's/^(0\.0\.0\.0[[:space:]]+[a-zA-Z0-9._-]*\.?licdn\.com)/#\1/' /etc/hosts
# Add custom entries for YouTube and Discord
echo "Adding custom entries for YouTube and Discord..."
tee -a /etc/hosts >/dev/null <<'EOF'

View File

@ -262,7 +262,7 @@ _full_adapter_reset_and_connect() {
sleep 3
log_info "Reconnecting to $mac after adapter reset..."
{ echo "power on"; sleep 1; echo "connect $mac"; sleep 20; } \
{ echo "agent on"; echo "default-agent"; sleep 1; echo "power on"; sleep 1; echo "connect $mac"; sleep 20; } \
| bluetoothctl 2>/dev/null || true
}
@ -280,7 +280,7 @@ _verify_audio_sink() {
# Give PipeWire time to create the audio card
local _attempt
for _attempt in 1 2 3 4 5; do
if pactl list cards short 2>/dev/null | grep -q "$card_name"; then
if _run_as_user pactl list cards short 2>/dev/null | grep -q "$card_name"; then
log_ok "Bluetooth audio card detected in PipeWire."
return 0
fi
@ -426,7 +426,7 @@ check_pipewire_health() {
fi
# Test if PipeWire is responding within 3 seconds
if timeout 3 wpctl status &>/dev/null; then
if timeout 3 _run_as_user wpctl status &>/dev/null; then
log_ok "PipeWire is responsive."
return 0
fi
@ -436,8 +436,11 @@ check_pipewire_health() {
_restart_pipewire_stack
}
_restart_pipewire_stack() {
# Restart as the calling user (these are user services)
# ---------------------------------------------------------------------------
# Helper: run a command as the actual (non-root) user with PipeWire env.
# Needed because pactl/wpctl/systemctl --user talk to the user session.
# ---------------------------------------------------------------------------
_run_as_user() {
local target_user
target_user="${SUDO_USER:-$USER}"
local target_uid
@ -446,7 +449,11 @@ _restart_pipewire_stack() {
sudo -u "$target_user" \
XDG_RUNTIME_DIR="/run/user/$target_uid" \
DBUS_SESSION_BUS_ADDRESS="unix:path=/run/user/$target_uid/bus" \
systemctl --user restart pipewire pipewire-pulse wireplumber
"$@"
}
_restart_pipewire_stack() {
_run_as_user systemctl --user restart pipewire pipewire-pulse wireplumber
sleep 3
log_info "Waiting for audio stack to initialize..."
@ -478,7 +485,7 @@ connect_device() {
# ---- Attempt 1: direct connect (existing pairing) ----
if echo "$info" | grep -q "Paired: yes"; then
log_info "Device is already paired. Trying direct connect..."
{ echo "power on"; sleep 1; echo "connect $TARGET_MAC"; sleep 15; } \
{ echo "agent on"; echo "default-agent"; sleep 1; echo "power on"; sleep 1; echo "connect $TARGET_MAC"; sleep 15; } \
| bluetoothctl 2>/dev/null || true
if _check_connection_health "$TARGET_MAC"; then
@ -510,7 +517,7 @@ connect_device() {
# Pair
log_info "Pairing..."
{ echo "power on"; sleep 1; echo "pair $TARGET_MAC"; sleep 5; } \
{ echo "agent on"; echo "default-agent"; sleep 1; echo "power on"; sleep 1; echo "pair $TARGET_MAC"; sleep 5; } \
| bluetoothctl 2>/dev/null || true
# Trust (so it auto-reconnects in the future)
@ -519,7 +526,7 @@ connect_device() {
# Connect
log_info "Connecting..."
{ echo "power on"; sleep 1; echo "connect $TARGET_MAC"; sleep 15; } \
{ echo "agent on"; echo "default-agent"; sleep 1; echo "power on"; sleep 1; echo "connect $TARGET_MAC"; sleep 15; } \
| bluetoothctl 2>/dev/null || true
# Verify connection + services + audio
@ -595,7 +602,7 @@ set_audio_profile() {
local card_name="bluez_card.${TARGET_MAC//:/_}"
local card_info
card_info=$(pactl list cards 2>/dev/null || true)
card_info=$(_run_as_user pactl list cards 2>/dev/null || true)
if ! echo "$card_info" | grep -q "$card_name"; then
log_info "No PipeWire audio card found for device (may not be an audio device)."
@ -608,7 +615,7 @@ set_audio_profile() {
if [[ $current_profile == *"sbc_xq"* ]]; then
log_warn "SBC-XQ codec active — may cause audio dropouts on older adapters."
apply_fix "Switching to standard SBC codec" \
pactl set-card-profile "$card_name" a2dp-sink
_run_as_user pactl set-card-profile "$card_name" a2dp-sink
elif [[ -n $current_profile ]]; then
log_ok "Audio profile: $current_profile"
fi
@ -693,6 +700,7 @@ main() {
check_packages
check_firmware
check_adapter_stuck
remove_stale_pairing
fix_usb_autosuspend
check_pipewire_health
restart_bluetooth

View File

@ -79,6 +79,9 @@ com.microsoft.teams
# --- Manga reader ---
eu.kanade.tachiyomi.sy
# --- Development ---
com.github.android
"
# ============================================================

View File

@ -46,6 +46,28 @@ build_sysprotect_file() {
| sed 's/^[[:space:]]*//;s/[[:space:]]*$//' > "$STATE_DIR/sysprotect.txt"
}
reconcile_disabled_apps() {
[ -f "$DISABLED_APPS_FILE" ] || return
local tmp_disabled="$STATE_DIR/disabled_by_focus.tmp"
: > "$tmp_disabled"
while IFS= read -r pkg; do
[ -z "$pkg" ] && continue
if is_allowed "$pkg"; then
pm install-existing --user 0 "$pkg" >/dev/null 2>&1 || true
pm enable "$pkg" >/dev/null 2>&1 || true
log "Re-enabled allowed app during state reconciliation: $pkg"
continue
fi
echo "$pkg" >> "$tmp_disabled"
done < "$DISABLED_APPS_FILE"
mv "$tmp_disabled" "$DISABLED_APPS_FILE"
}
# ---- Initialization ----
init() {
mkdir -p "$STATE_DIR"
@ -66,6 +88,10 @@ init() {
CURRENT_MODE="normal"
fi
if [ "$CURRENT_MODE" = "focus" ]; then
reconcile_disabled_apps
fi
log "Focus mode daemon started (PID=$$, mode=$CURRENT_MODE, home=$HOME_LAT,$HOME_LON, radius=${RADIUS}m)"
log "Intervals: focus=${CHECK_INTERVAL_FOCUS}s normal=${CHECK_INTERVAL_NORMAL}s"
}
@ -116,13 +142,15 @@ is_allowed() {
# ---- Focus Mode Control ----
enable_focus_mode() {
[ "$CURRENT_MODE" = "focus" ] && return
if [ "$CURRENT_MODE" = "focus" ]; then
reconcile_disabled_apps
return
fi
log "ENABLING focus mode - restricting non-whitelisted apps"
: > "$DISABLED_APPS_FILE"
local tmp_pkgs="$STATE_DIR/pkg_list.txt"
pm list packages -3 2>/dev/null | sed 's/^package://' > "$tmp_pkgs"
while IFS= read -r pkg; do
[ -z "$pkg" ] && continue
is_allowed "$pkg" && continue
@ -153,6 +181,8 @@ enable_focus_mode() {
CURRENT_MODE="focus"
echo "focus" > "$MODE_FILE"
log "Focus mode enabled - disabled $count apps"
reconcile_disabled_apps
}
disable_focus_mode() {

View File

@ -18,6 +18,7 @@ from http import HTTPStatus
import json
import logging
import time
from typing import Any
import aiohttp
from howlongtobeatpy.HTMLRequests import HTMLRequests
@ -175,6 +176,31 @@ def _build_search_payload(game_name: str) -> str:
)
def _pick_best_hltb_entry(
search_name: str,
candidates: list[tuple[dict[str, Any], float]],
) -> tuple[dict[str, Any], float] | None:
"""Pick the best HLTB entry, preferring full editions over demos/chapters.
When a short name like "FAITH" matches both "FAITH" (demo) and
"FAITH: The Unholy Trinity" (full game), prefer the full game
since Steam often lists the full game under the shorter name.
"""
if not candidates:
return None
if len(candidates) == 1:
return candidates[0]
lower = search_name.lower()
for entry, sim in candidates:
entry_name = (entry.get("game_name") or "").lower()
if entry_name.startswith((lower + ":", lower + " -")):
return entry, sim
# Fall back to highest similarity.
return max(candidates, key=lambda x: x[1])
# ──────────────────────────────────────────────────────────────
# Async fetching with shared session & progress
# ──────────────────────────────────────────────────────────────
@ -211,6 +237,8 @@ async def _search_one(
) as resp:
if resp.status == HTTPStatus.OK:
data = await resp.json()
candidates: list[tuple[dict[str, Any], float]] = []
lower_name = name.lower()
for entry in data.get("data", []):
entry_name = entry.get("game_name", "")
entry_alias = entry.get("game_alias", "") or ""
@ -218,18 +246,24 @@ async def _search_one(
_similarity(name, entry_name),
_similarity(name, entry_alias),
)
if sim >= MIN_SIMILARITY:
is_full_edition = entry_name.lower().startswith(
lower_name + ":"
) or entry_name.lower().startswith(lower_name + " -")
if sim >= MIN_SIMILARITY or is_full_edition:
comp_100 = entry.get("comp_100", 0)
if comp_100 and comp_100 > 0:
hours = round(comp_100 / 3600, 2)
result = HLTBResult(
app_id=app_id,
game_name=name,
completionist_hours=hours,
similarity=sim,
hltb_game_id=entry.get("game_id", 0),
)
break
candidates.append((entry, sim))
best = _pick_best_hltb_entry(name, candidates)
if best is not None:
entry, sim = best
hours = round(entry["comp_100"] / 3600, 2)
result = HLTBResult(
app_id=app_id,
game_name=name,
completionist_hours=hours,
similarity=sim,
hltb_game_id=entry.get("game_id", 0),
)
except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
logger.debug("HLTB search failed for '%s': %s", name, exc)

View File

@ -27,6 +27,7 @@ from python_pkg.steam_backlog_enforcer.enforcer import (
)
from python_pkg.steam_backlog_enforcer.hltb import (
fetch_hltb_times_cached,
load_hltb_cache,
)
from python_pkg.steam_backlog_enforcer.library_hider import (
hide_other_games,
@ -891,27 +892,6 @@ def cmd_list(_config: Config, state: State) -> None:
_echo(f"\n COMPLETE: {len(complete)} games")
def cmd_skip(config: Config, state: State) -> None:
"""Skip the currently assigned game."""
if state.current_app_id is None:
_echo("No game currently assigned.")
return
_echo(f"Skipping: {state.current_game_name}")
config.skip_app_ids.append(state.current_app_id)
config.save()
snapshot = load_snapshot()
if snapshot:
games = [GameInfo.from_snapshot(d) for d in snapshot]
pick_next_game(games, state, config)
else:
state.current_app_id = None
state.current_game_name = ""
state.save()
_echo("Run 'scan' to pick a new game.")
def cmd_unblock(_config: Config, _state: State) -> None:
"""Remove store blocking."""
if unblock_store():
@ -1083,6 +1063,79 @@ def cmd_unhide(config: Config, _state: State) -> None:
_echo("Done!")
def _try_reassign_shorter_game(
hltb_cache: dict[int, float],
app_id: int,
hours: float,
state: State,
config: Config,
) -> bool:
"""Check if a shorter game is available and reassign if so."""
snapshot_data = load_snapshot()
if not snapshot_data:
return False
all_games = [GameInfo.from_snapshot(d) for d in snapshot_data]
for g in all_games:
cached_hours = hltb_cache.get(g.app_id, -1.0)
if cached_hours > 0:
g.completionist_hours = cached_hours
skip = set(config.skip_app_ids) | set(state.finished_app_ids)
candidates = [
g
for g in all_games
if not g.is_complete and g.app_id not in skip and g.completionist_hours > 0
]
candidates.sort(key=lambda g: g.completionist_hours)
if not candidates or candidates[0].app_id == app_id:
return False
shortest = candidates[0]
_echo(
f"\n Reassigning: {shortest.name} is shorter"
f" (~{shortest.completionist_hours:.1f}h vs ~{hours:.1f}h)"
)
pick_next_game(all_games, state, config)
return True
def _finalize_completion(
config: Config,
state: State,
game_name: str,
app_id: int,
) -> None:
"""Mark game complete, pick next, hide non-assigned games, notify."""
_echo(f"\n COMPLETED: {game_name}!")
state.finished_app_ids.append(app_id)
snapshot_data = load_snapshot()
_echo("\nPicking next game...")
if not snapshot_data:
_echo(" No snapshot found. Run 'scan' first.")
state.current_app_id = None
state.current_game_name = ""
state.save()
return
games = [GameInfo.from_snapshot(d) for d in snapshot_data]
pick_next_game(games, state, config)
if state.current_app_id is None:
_echo(" No more games to assign!")
return
owned_ids = _get_all_owned_app_ids(config)
if owned_ids:
hidden = hide_other_games(owned_ids, state.current_app_id)
if hidden > 0:
_echo(f"\n Library: hid {hidden} games")
send_notification(
"Game Complete!",
f"Finished {game_name}! Now playing: {state.current_game_name}",
)
_echo(f"\nAll done! Go play {state.current_game_name}!")
def cmd_done(config: Config, state: State) -> None:
"""Check completion, pick next game, uninstall & hide.
@ -1112,44 +1165,23 @@ def cmd_done(config: Config, state: State) -> None:
f" ({game.completion_pct:.1f}%)"
)
hltb_cache = load_hltb_cache()
hours = hltb_cache.get(app_id, -1.0)
if hours < 0:
hltb_cache = fetch_hltb_times_cached([(app_id, game_name)])
hours = hltb_cache.get(app_id, -1.0)
if hours > 0:
_echo(f" HLTB 100% estimate: {hours:.1f} hours")
if _try_reassign_shorter_game(hltb_cache, app_id, hours, state, config):
return
if not game.is_complete:
remaining = game.total_achievements - game.unlocked_achievements
_echo(f"\n NOT COMPLETE: {remaining} achievements remaining. Keep going!")
return
# ── Step 1: Mark complete ──
_echo(f"\n COMPLETED: {game_name}!")
state.finished_app_ids.append(app_id)
# ── Step 2: Pick next game ──
snapshot_data = load_snapshot()
_echo("\nPicking next game...")
if not snapshot_data:
_echo(" No snapshot found. Run 'scan' first.")
state.current_app_id = None
state.current_game_name = ""
state.save()
return
games = [GameInfo.from_snapshot(d) for d in snapshot_data]
pick_next_game(games, state, config)
if state.current_app_id is None:
_echo(" No more games to assign!")
return
# ── Step 3: Hide non-assigned games in library ──
owned_ids = _get_all_owned_app_ids(config)
if owned_ids:
hidden = hide_other_games(owned_ids, state.current_app_id)
if hidden > 0:
_echo(f"\n Library: hid {hidden} games")
send_notification(
"Game Complete!",
f"Finished {game_name}! Now playing: {state.current_game_name}",
)
_echo(f"\nAll done! Go play {state.current_game_name}!")
_finalize_completion(config, state, game_name, app_id)
COMMANDS = {
@ -1157,7 +1189,6 @@ COMMANDS = {
"check": ("Check assigned game completion", do_check),
"status": ("Show current status", cmd_status),
"list": ("List games from snapshot", cmd_list),
"skip": ("Skip currently assigned game", cmd_skip),
"enforce": ("Run enforcer: block, uninstall, kill, hide", do_enforce),
"install": ("Install the assigned game", cmd_install),
"hide": ("Hide all non-assigned games in library", cmd_hide),

View File

@ -117,6 +117,11 @@ class SteamAPIClient:
self.api_key = api_key
self.steam_id = steam_id
self.session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
pool_maxsize=MAX_WORKERS,
pool_connections=MAX_WORKERS,
)
self.session.mount("https://", adapter)
self.session.headers["Accept"] = "application/json"
self._rate_lock = threading.Lock()
self._request_times: list[float] = []