This commit is contained in:
Bastian Wagner
2026-05-05 19:26:43 +02:00
commit 8d07939527
29 changed files with 2646 additions and 0 deletions

13
.dockerignore Normal file
View File

@@ -0,0 +1,13 @@
.env
.venv
__pycache__
.pytest_cache
.ruff_cache
.git
data
htmlcov
.coverage
dist
build
*.egg-info

35
.env.example Normal file
View File

@@ -0,0 +1,35 @@
MYWHOOSH_EMAIL=
MYWHOOSH_PASSWORD=
GARMIN_EMAIL=
GARMIN_PASSWORD=
POLL_INTERVAL_SECONDS=3600
DATA_DIR=/data
LOG_LEVEL=INFO
DRY_RUN=true
DASHBOARD_ENABLED=true
DASHBOARD_BIND=0.0.0.0
DASHBOARD_PORT=8080
MYWHOOSH_LOGIN_URL=https://www.event.mywhoosh.com/login/
MYWHOOSH_ACTIVITY_URL=https://event.mywhoosh.com/user/activities
MYWHOOSH_HEADLESS=true
MYWHOOSH_BROWSER_STATE_DIR=/data/browser
MYWHOOSH_AUTH_STATE_PATH=/data/mywhoosh_auth_state.json
MYWHOOSH_TIMEOUT_SECONDS=45
MYWHOOSH_MAX_DOWNLOADS_PER_RUN=10
MYWHOOSH_DOWNLOAD_TEXT_HINTS=fit,download
MYWHOOSH_ACTIVITIES_BUTTON_TEXT=ACTIVITIES
MYWHOOSH_DOWNLOAD_BUTTON_SELECTOR=.btnDownload
MYWHOOSH_SLOW_MO_MS=0
MYWHOOSH_MANUAL_LOGIN_WAIT_SECONDS=0
MYWHOOSH_DEBUG_SCREENSHOTS=false
MYWHOOSH_DEBUG_DIR=/data/debug
GARMIN_TOKENSTORE=/data/garmin_tokens
GARMIN_MFA_CODE=
TARGET_GARMIN_MANUFACTURER_ID=1
TARGET_GARMIN_PRODUCT_ID=3578
TARGET_GARMIN_PRODUCT_NAME=Edge 1030 Plus
TARGET_GARMIN_SERIAL_NUMBER=

13
.gitignore vendored Normal file
View File

@@ -0,0 +1,13 @@
.env
.venv/
__pycache__/
.pytest_cache/
.ruff_cache/
*.pyc
htmlcov/
.coverage
data/
dist/
build/
*.egg-info/

41
Dockerfile Normal file
View File

@@ -0,0 +1,41 @@
FROM python:3.12-slim
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1
WORKDIR /app
ARG BROWSER_DEBUG_TOOLS=false
RUN apt-get update \
&& apt-get install -y --no-install-recommends ca-certificates curl \
&& rm -rf /var/lib/apt/lists/*
RUN if [ "$BROWSER_DEBUG_TOOLS" = "true" ]; then \
apt-get update \
&& apt-get install -y --no-install-recommends \
fluxbox \
novnc \
websockify \
x11vnc \
xvfb \
&& rm -rf /var/lib/apt/lists/*; \
fi
COPY requirements.txt requirements-dev.txt pyproject.toml ./
COPY src ./src
COPY tests ./tests
COPY scripts ./scripts
RUN pip install --upgrade pip \
&& pip install -r requirements.txt \
&& pip install . \
&& chmod +x /app/scripts/browser-debug.sh \
&& python -m playwright install --with-deps chromium
COPY README.md ./
VOLUME ["/data"]
ENTRYPOINT ["python", "-m", "mywhoosh_garmin_sync"]
CMD ["serve"]

152
README.md Normal file
View File

@@ -0,0 +1,152 @@
# MyWhoosh Garmin Sync
Containerized background sync for personal MyWhoosh activities:
1. Log in to MyWhoosh with Playwright.
2. Find and download new `.fit` activity files.
3. Rewrite FIT device metadata locally to Garmin Edge 1030 Plus.
4. Upload the converted activity to Garmin Connect.
5. Persist sessions, downloaded files, converted files, and sync state under `/data`.
## Quick Start
Create your local env file:
```sh
cp .env.example .env
```
Edit `.env`, then run:
```sh
docker compose up -d --build
docker compose logs -f sync
```
The default polling interval is hourly.
## Configuration
Required values:
```env
MYWHOOSH_EMAIL=
MYWHOOSH_PASSWORD=
GARMIN_EMAIL=
GARMIN_PASSWORD=
```
Useful optional values:
```env
POLL_INTERVAL_SECONDS=3600
DATA_DIR=/data
LOG_LEVEL=INFO
DRY_RUN=false
MYWHOOSH_LOGIN_URL=https://www.mywhoosh.com/login/
MYWHOOSH_ACTIVITY_URL=https://www.mywhoosh.com/profile/
MYWHOOSH_ACTIVITIES_BUTTON_TEXT=ACTIVITIES
MYWHOOSH_DOWNLOAD_BUTTON_SELECTOR=.btnDownload
TARGET_GARMIN_PRODUCT_ID=3578
TARGET_GARMIN_SERIAL_NUMBER=
```
If Garmin MFA is required on first login, set `GARMIN_MFA_CODE` for one run, let the token store persist under `/data/garmin_tokens`, then remove the variable.
## Commands
Inside the container:
```sh
python -m mywhoosh_garmin_sync run-once
python -m mywhoosh_garmin_sync serve
python -m mywhoosh_garmin_sync convert --input /data/raw/example.fit --output /data/converted/example.fit
```
`DRY_RUN=true` downloads and converts files but does not upload to Garmin.
Run tests from the built image:
```sh
docker build -t mywhoosh-garmin-sync:test .
docker run --rm --entrypoint sh mywhoosh-garmin-sync:test -c "pip install -r requirements-dev.txt && pytest"
```
## Browser Debugging
Use the debug compose file when MyWhoosh blocks automated login with cookies, captcha, or bot checks:
```sh
docker compose -f docker-compose.debug.yml up --build
```
Then open noVNC:
```text
http://localhost:6080/vnc.html
```
Click `Connect`. You should see Chromium inside the container. Cookie consent is accepted automatically where the banner uses a normal consent button. Captcha/bot checks are left for you to solve manually. The debug run uses:
```env
MYWHOOSH_HEADLESS=false
MYWHOOSH_MANUAL_LOGIN_WAIT_SECONDS=900
MYWHOOSH_SLOW_MO_MS=250
MYWHOOSH_DEBUG_SCREENSHOTS=true
DRY_RUN=true
```
If this runs on a remote Linux server, keep the default localhost-only port binding and tunnel it:
```sh
ssh -L 6080:localhost:6080 user@your-server
```
Then open `http://localhost:6080/vnc.html` on your own machine. During the 900-second manual-login window, accept cookies, solve the challenge, and finish the MyWhoosh login in the visible browser. The browser profile is stored in `./data/browser`, so the normal headless service can reuse the session later.
Debug screenshots are written to:
```text
./data/debug
```
After a successful manual login, stop the debug container and run the normal service:
```sh
docker compose up -d --build
```
## Session Persistence
MyWhoosh auth is persisted in two places under the mounted `./data` directory:
```text
./data/browser
./data/mywhoosh_auth_state.json
```
Keep that directory when recreating containers. Use `docker compose down` when switching between debug and normal mode, but do not use `docker compose down -v` and do not delete `./data` unless you want to log in again.
If Chromium reports that the profile is locked after a crash, stop all containers and remove only these root files:
```text
./data/browser/SingletonCookie
./data/browser/SingletonLock
./data/browser/SingletonSocket
```
## Notes
MyWhoosh does not appear to publish a stable public activity export API. This project uses a persisted headless browser session and conservative hourly polling. If MyWhoosh changes its page structure, adjust these `.env` values before changing code:
```env
MYWHOOSH_LOGIN_URL=
MYWHOOSH_ACTIVITY_URL=
MYWHOOSH_DOWNLOAD_TEXT_HINTS=fit,download
MYWHOOSH_ACTIVITIES_BUTTON_TEXT=ACTIVITIES
MYWHOOSH_DOWNLOAD_BUTTON_SELECTOR=.btnDownload
```
The FIT conversion patches `file_id` and `device_info` messages where Garmin manufacturer/product fields are present, then recalculates header and file CRCs. The default Garmin product ID is configurable so it can be corrected without a rebuild if Garmin FIT profile values change.
This automates personal account actions. Keep polling conservative and make sure the way you use it is compatible with the services' terms.

25
docker-compose.debug.yml Normal file
View File

@@ -0,0 +1,25 @@
services:
sync-debug:
build:
context: .
args:
BROWSER_DEBUG_TOOLS: "true"
env_file:
- .env
environment:
DATA_DIR: /data
DRY_RUN: "true"
LOG_LEVEL: DEBUG
MYWHOOSH_HEADLESS: "false"
MYWHOOSH_SLOW_MO_MS: "250"
MYWHOOSH_MANUAL_LOGIN_WAIT_SECONDS: "900"
MYWHOOSH_DEBUG_SCREENSHOTS: "true"
volumes:
- ./data:/data
ports:
- "127.0.0.1:6080:6080"
- "127.0.0.1:5900:5900"
shm_size: "1gb"
restart: "no"
entrypoint: ["/app/scripts/browser-debug.sh"]
command: ["run-once"]

11
docker-compose.yml Normal file
View File

@@ -0,0 +1,11 @@
services:
sync:
build: .
env_file:
- .env
volumes:
- ./data:/data
ports:
- "127.0.0.1:8080:8080"
restart: unless-stopped
command: ["serve"]

26
pyproject.toml Normal file
View File

@@ -0,0 +1,26 @@
[build-system]
requires = ["setuptools>=68"]
build-backend = "setuptools.build_meta"
[project]
name = "mywhoosh-garmin-sync"
version = "0.1.0"
description = "Sync MyWhoosh FIT activities to Garmin Connect with local device metadata conversion."
requires-python = ">=3.12"
dependencies = [
"curl_cffi>=0.11,<1",
"garminconnect>=0.3.3,<0.4",
"playwright>=1.52,<2",
"python-dotenv>=1.0,<2",
]
[project.scripts]
mywhoosh-garmin-sync = "mywhoosh_garmin_sync.cli:main"
[tool.setuptools.packages.find]
where = ["src"]
[tool.pytest.ini_options]
testpaths = ["tests"]
pythonpath = ["src"]
addopts = "-q"

3
requirements-dev.txt Normal file
View File

@@ -0,0 +1,3 @@
-r requirements.txt
pytest>=8.0,<9

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
curl_cffi>=0.11,<1
garminconnect>=0.3.3,<0.4
playwright>=1.52,<2
python-dotenv>=1.0,<2

16
scripts/browser-debug.sh Normal file
View File

@@ -0,0 +1,16 @@
#!/bin/sh
set -eu
export DISPLAY="${DISPLAY:-:99}"
VNC_PORT="${VNC_PORT:-5900}"
NOVNC_PORT="${NOVNC_PORT:-6080}"
VNC_RESOLUTION="${VNC_RESOLUTION:-1440x1000x24}"
Xvfb "$DISPLAY" -screen 0 "$VNC_RESOLUTION" -ac +extension RANDR >/tmp/xvfb.log 2>&1 &
fluxbox >/tmp/fluxbox.log 2>&1 &
x11vnc -display "$DISPLAY" -forever -shared -nopw -rfbport "$VNC_PORT" >/tmp/x11vnc.log 2>&1 &
websockify --web=/usr/share/novnc/ "$NOVNC_PORT" "localhost:$VNC_PORT" >/tmp/novnc.log 2>&1 &
echo "noVNC is available on port $NOVNC_PORT. Open /vnc.html and click Connect."
exec python -m mywhoosh_garmin_sync "$@"

View File

@@ -0,0 +1,4 @@
"""MyWhoosh to Garmin Connect sync service."""
__version__ = "0.1.0"

View File

@@ -0,0 +1,5 @@
from .cli import main
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,65 @@
from __future__ import annotations
import argparse
import asyncio
from pathlib import Path
from .config import Settings
from .fit_device import GarminDevice, convert_fit_device
from .logging_setup import setup_logging
from .service import SyncService
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="mywhoosh-garmin-sync",
description="Download MyWhoosh FIT files, rewrite device metadata, and upload to Garmin Connect.",
)
subparsers = parser.add_subparsers(dest="command")
subparsers.add_parser("serve", help="Run the periodic background sync loop.")
subparsers.add_parser("run-once", help="Run one sync cycle and exit.")
convert_parser = subparsers.add_parser("convert", help="Convert one FIT file locally.")
convert_parser.add_argument("--input", required=True, type=Path)
convert_parser.add_argument("--output", required=True, type=Path)
return parser
async def _run_async(args: argparse.Namespace, settings: Settings) -> int:
if args.command == "run-once":
service = SyncService(settings)
await service.run_once()
return 0
if args.command == "convert":
device = GarminDevice(
manufacturer_id=settings.target_garmin_manufacturer_id,
product_id=settings.target_garmin_product_id,
product_name=settings.target_garmin_product_name,
serial_number=settings.target_garmin_serial_number,
)
result = convert_fit_device(args.input, args.output, device)
print(
f"Converted {args.input} -> {args.output}; "
f"patched {result.patched_field_count} fields"
)
return 0
service = SyncService(settings)
await service.serve()
return 0
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
if args.command is None:
args.command = "serve"
settings = Settings.from_env()
settings.ensure_directories()
setup_logging(settings.log_level)
return asyncio.run(_run_async(args, settings))

View File

@@ -0,0 +1,174 @@
from __future__ import annotations
import os
from dataclasses import dataclass
from pathlib import Path
from dotenv import load_dotenv
def _bool_env(name: str, default: bool) -> bool:
value = os.getenv(name)
if value is None or value == "":
return default
return value.strip().lower() in {"1", "true", "yes", "y", "on"}
def _int_env(name: str, default: int) -> int:
value = os.getenv(name)
if value is None or value == "":
return default
return int(value)
def _optional_int_env(name: str) -> int | None:
value = os.getenv(name)
if value is None or value.strip() == "":
return None
return int(value)
def _csv_env(name: str, default: list[str]) -> list[str]:
value = os.getenv(name)
if value is None or value.strip() == "":
return default
return [item.strip().lower() for item in value.split(",") if item.strip()]
def _required_env(name: str) -> str:
value = os.getenv(name)
if value is None or value.strip() == "":
raise ValueError(f"Missing required environment variable: {name}")
return value
@dataclass(frozen=True)
class Settings:
mywhoosh_email: str
mywhoosh_password: str
garmin_email: str
garmin_password: str
poll_interval_seconds: int
data_dir: Path
raw_dir: Path
converted_dir: Path
browser_state_dir: Path
mywhoosh_auth_state_path: Path
garmin_tokenstore: Path
db_path: Path
log_level: str
dry_run: bool
dashboard_enabled: bool
dashboard_bind: str
dashboard_port: int
mywhoosh_login_url: str
mywhoosh_activity_url: str
mywhoosh_headless: bool
mywhoosh_timeout_seconds: int
mywhoosh_max_downloads_per_run: int
mywhoosh_download_text_hints: list[str]
mywhoosh_activities_button_text: str
mywhoosh_download_button_selector: str
mywhoosh_slow_mo_ms: int
mywhoosh_manual_login_wait_seconds: int
mywhoosh_debug_screenshots: bool
mywhoosh_debug_dir: Path
garmin_mfa_code: str | None
target_garmin_manufacturer_id: int
target_garmin_product_id: int
target_garmin_product_name: str
target_garmin_serial_number: int | None
@classmethod
def from_env(cls) -> "Settings":
load_dotenv()
data_dir = Path(os.getenv("DATA_DIR", "/data"))
return cls(
mywhoosh_email=_required_env("MYWHOOSH_EMAIL"),
mywhoosh_password=_required_env("MYWHOOSH_PASSWORD"),
garmin_email=_required_env("GARMIN_EMAIL"),
garmin_password=_required_env("GARMIN_PASSWORD"),
poll_interval_seconds=_int_env("POLL_INTERVAL_SECONDS", 3600),
data_dir=data_dir,
raw_dir=Path(os.getenv("RAW_DIR", str(data_dir / "raw"))),
converted_dir=Path(
os.getenv("CONVERTED_DIR", str(data_dir / "converted"))
),
browser_state_dir=Path(
os.getenv("MYWHOOSH_BROWSER_STATE_DIR", str(data_dir / "browser"))
),
mywhoosh_auth_state_path=Path(
os.getenv(
"MYWHOOSH_AUTH_STATE_PATH",
str(data_dir / "mywhoosh_auth_state.json"),
)
),
garmin_tokenstore=Path(
os.getenv("GARMIN_TOKENSTORE", str(data_dir / "garmin_tokens"))
),
db_path=Path(os.getenv("STATE_DB", str(data_dir / "state.sqlite3"))),
log_level=os.getenv("LOG_LEVEL", "INFO").upper(),
dry_run=_bool_env("DRY_RUN", False),
dashboard_enabled=_bool_env("DASHBOARD_ENABLED", True),
dashboard_bind=os.getenv("DASHBOARD_BIND", "0.0.0.0"),
dashboard_port=_int_env("DASHBOARD_PORT", 8080),
mywhoosh_login_url=os.getenv(
"MYWHOOSH_LOGIN_URL", "https://www.mywhoosh.com/login/"
),
mywhoosh_activity_url=os.getenv(
"MYWHOOSH_ACTIVITY_URL", "https://www.mywhoosh.com/profile/"
),
mywhoosh_headless=_bool_env("MYWHOOSH_HEADLESS", True),
mywhoosh_timeout_seconds=_int_env("MYWHOOSH_TIMEOUT_SECONDS", 45),
mywhoosh_max_downloads_per_run=_int_env(
"MYWHOOSH_MAX_DOWNLOADS_PER_RUN", 10
),
mywhoosh_download_text_hints=_csv_env(
"MYWHOOSH_DOWNLOAD_TEXT_HINTS", ["fit", "download"]
),
mywhoosh_activities_button_text=os.getenv(
"MYWHOOSH_ACTIVITIES_BUTTON_TEXT", "ACTIVITIES"
),
mywhoosh_download_button_selector=os.getenv(
"MYWHOOSH_DOWNLOAD_BUTTON_SELECTOR", ".btnDownload"
),
mywhoosh_slow_mo_ms=_int_env("MYWHOOSH_SLOW_MO_MS", 0),
mywhoosh_manual_login_wait_seconds=_int_env(
"MYWHOOSH_MANUAL_LOGIN_WAIT_SECONDS", 0
),
mywhoosh_debug_screenshots=_bool_env(
"MYWHOOSH_DEBUG_SCREENSHOTS", False
),
mywhoosh_debug_dir=Path(
os.getenv("MYWHOOSH_DEBUG_DIR", str(data_dir / "debug"))
),
garmin_mfa_code=os.getenv("GARMIN_MFA_CODE") or None,
target_garmin_manufacturer_id=_int_env(
"TARGET_GARMIN_MANUFACTURER_ID", 1
),
target_garmin_product_id=_int_env("TARGET_GARMIN_PRODUCT_ID", 3578),
target_garmin_product_name=os.getenv(
"TARGET_GARMIN_PRODUCT_NAME", "Edge 1030 Plus"
),
target_garmin_serial_number=_optional_int_env(
"TARGET_GARMIN_SERIAL_NUMBER"
),
)
def ensure_directories(self) -> None:
for path in (
self.data_dir,
self.raw_dir,
self.converted_dir,
self.browser_state_dir,
self.mywhoosh_auth_state_path.parent,
self.garmin_tokenstore,
self.mywhoosh_debug_dir,
self.db_path.parent,
):
path.mkdir(parents=True, exist_ok=True)

View File

@@ -0,0 +1,39 @@
from __future__ import annotations
CRC_TABLE = (
0x0000,
0xCC01,
0xD801,
0x1400,
0xF001,
0x3C00,
0x2800,
0xE401,
0xA001,
0x6C00,
0x7800,
0xB401,
0x5000,
0x9C01,
0x8801,
0x4400,
)
def update_crc(crc: int, byte: int) -> int:
tmp = CRC_TABLE[crc & 0xF]
crc = (crc >> 4) & 0x0FFF
crc = crc ^ tmp ^ CRC_TABLE[byte & 0xF]
tmp = CRC_TABLE[crc & 0xF]
crc = (crc >> 4) & 0x0FFF
crc = crc ^ tmp ^ CRC_TABLE[(byte >> 4) & 0xF]
return crc & 0xFFFF
def fit_crc(data: bytes | bytearray | memoryview) -> int:
crc = 0
for byte in data:
crc = update_crc(crc, byte)
return crc

View File

@@ -0,0 +1,381 @@
from __future__ import annotations
import logging
import struct
from dataclasses import dataclass
from pathlib import Path
from .fit_crc import fit_crc
logger = logging.getLogger(__name__)
FILE_ID_MESG_NUM = 0
DEVICE_INFO_MESG_NUM = 23
GARMIN_MANUFACTURER_ID = 1
@dataclass(frozen=True)
class GarminDevice:
manufacturer_id: int = GARMIN_MANUFACTURER_ID
product_id: int = 3578
product_name: str = "Edge 1030 Plus"
serial_number: int | None = None
@dataclass(frozen=True)
class FitConversionResult:
source_path: Path
output_path: Path
patched_field_count: int
header_crc: int | None
file_crc: int
@dataclass(frozen=True)
class FieldDefinition:
num: int
size: int
base_type: int
@dataclass(frozen=True)
class LocalDefinition:
global_message_num: int
endian: str
fields: tuple[FieldDefinition, ...]
record_size: int
developer_field_size: int
@dataclass(frozen=True)
class DeviceFieldValue:
global_message_num: int
field_num: int
value: int | str
class FitFormatError(ValueError):
"""Raised when a file is not a valid enough FIT file for metadata patching."""
def convert_fit_device(
source_path: Path, output_path: Path, device: GarminDevice | None = None
) -> FitConversionResult:
device = device or GarminDevice()
data = bytearray(source_path.read_bytes())
_validate_fit_container(data)
patched_count = _patch_device_metadata(data, device)
header_crc = _rewrite_header_crc(data)
file_crc = _rewrite_file_crc(data)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_bytes(data)
logger.info(
"Converted FIT metadata for %s -> %s; patched_fields=%d",
source_path,
output_path,
patched_count,
)
return FitConversionResult(
source_path=source_path,
output_path=output_path,
patched_field_count=patched_count,
header_crc=header_crc,
file_crc=file_crc,
)
def is_fit_file(path: Path) -> bool:
try:
data = path.read_bytes()
_validate_fit_container(data)
except (OSError, FitFormatError):
return False
return True
def read_device_field_values(path: Path) -> list[DeviceFieldValue]:
data = bytearray(path.read_bytes())
_validate_fit_container(data)
values: list[DeviceFieldValue] = []
for definition, field_offsets in _iter_data_fields(data):
for field, offset in field_offsets:
if definition.global_message_num == FILE_ID_MESG_NUM and field.num in {
1,
2,
3,
8,
}:
values.append(
DeviceFieldValue(
definition.global_message_num,
field.num,
_read_field_value(data, offset, field, definition.endian),
)
)
if definition.global_message_num == DEVICE_INFO_MESG_NUM and field.num in {
2,
3,
4,
27,
}:
values.append(
DeviceFieldValue(
definition.global_message_num,
field.num,
_read_field_value(data, offset, field, definition.endian),
)
)
return values
def _validate_fit_container(data: bytearray) -> None:
if len(data) < 14:
raise FitFormatError("FIT file is too small")
header_size = data[0]
if header_size not in {12, 14}:
raise FitFormatError(f"Unsupported FIT header size: {header_size}")
if len(data) < header_size + 2:
raise FitFormatError("FIT file is shorter than its header")
if bytes(data[8:12]) != b".FIT":
raise FitFormatError("Missing .FIT signature")
data_size = struct.unpack_from("<I", data, 4)[0]
expected_size = header_size + data_size + 2
if len(data) != expected_size:
raise FitFormatError(
f"FIT size mismatch: header says {expected_size} bytes, file has {len(data)}"
)
if header_size == 14:
expected_header_crc = struct.unpack_from("<H", data, 12)[0]
actual_header_crc = fit_crc(data[:12])
if expected_header_crc != actual_header_crc:
raise FitFormatError("FIT header CRC check failed")
expected_file_crc = struct.unpack_from("<H", data, len(data) - 2)[0]
actual_file_crc = fit_crc(data[:-2])
if expected_file_crc != actual_file_crc:
raise FitFormatError("FIT file CRC check failed")
def _patch_device_metadata(data: bytearray, device: GarminDevice) -> int:
patched_count = 0
eligible_field_count = 0
for definition, field_offsets in _iter_data_fields(data):
for field, offset in field_offsets:
target_value: int | str | None = None
if definition.global_message_num == FILE_ID_MESG_NUM:
if field.num == 1:
target_value = device.manufacturer_id
elif field.num == 2:
target_value = device.product_id
elif field.num == 3 and device.serial_number is not None:
target_value = device.serial_number
elif field.num == 8:
target_value = device.product_name
elif definition.global_message_num == DEVICE_INFO_MESG_NUM:
if field.num == 2:
target_value = device.manufacturer_id
elif field.num == 3 and device.serial_number is not None:
target_value = device.serial_number
elif field.num == 4:
target_value = device.product_id
elif field.num == 27:
target_value = device.product_name
if target_value is not None:
eligible_field_count += 1
if _write_field_value(data, offset, field, definition.endian, target_value):
patched_count += 1
if eligible_field_count == 0:
raise FitFormatError("No writable file_id or device_info device fields found")
return patched_count
def _iter_data_fields(
data: bytearray,
) -> list[tuple[LocalDefinition, list[tuple[FieldDefinition, int]]]]:
header_size = data[0]
data_size = struct.unpack_from("<I", data, 4)[0]
offset = header_size
end_offset = header_size + data_size
definitions: dict[int, LocalDefinition] = {}
data_records: list[tuple[LocalDefinition, list[tuple[FieldDefinition, int]]]] = []
while offset < end_offset:
record_header = data[offset]
offset += 1
if record_header & 0x80:
local_message_type = (record_header >> 5) & 0x03
definition = definitions.get(local_message_type)
if definition is None:
raise FitFormatError(
f"Compressed timestamp record used unknown local definition {local_message_type}"
)
field_offsets, offset = _collect_field_offsets(definition, offset)
data_records.append((definition, field_offsets))
continue
local_message_type = record_header & 0x0F
is_definition = bool(record_header & 0x40)
has_developer_fields = bool(record_header & 0x20)
if is_definition:
definition, offset = _read_definition(
data, offset, local_message_type, has_developer_fields
)
definitions[local_message_type] = definition
continue
definition = definitions.get(local_message_type)
if definition is None:
raise FitFormatError(
f"Data record used unknown local definition {local_message_type}"
)
field_offsets, offset = _collect_field_offsets(definition, offset)
data_records.append((definition, field_offsets))
if offset != end_offset:
raise FitFormatError("FIT parser did not end on data boundary")
return data_records
def _read_definition(
data: bytearray,
offset: int,
local_message_type: int,
has_developer_fields: bool,
) -> tuple[LocalDefinition, int]:
del local_message_type
if offset + 5 > len(data):
raise FitFormatError("Truncated FIT definition message")
offset += 1
architecture = data[offset]
offset += 1
endian = ">" if architecture == 1 else "<"
global_message_num = struct.unpack_from(f"{endian}H", data, offset)[0]
offset += 2
field_count = data[offset]
offset += 1
fields: list[FieldDefinition] = []
record_size = 0
for _ in range(field_count):
if offset + 3 > len(data):
raise FitFormatError("Truncated FIT field definition")
field = FieldDefinition(
num=data[offset],
size=data[offset + 1],
base_type=data[offset + 2],
)
fields.append(field)
record_size += field.size
offset += 3
developer_field_size = 0
if has_developer_fields:
if offset >= len(data):
raise FitFormatError("Truncated FIT developer field count")
developer_field_count = data[offset]
offset += 1
for _ in range(developer_field_count):
if offset + 3 > len(data):
raise FitFormatError("Truncated FIT developer fields")
developer_field_size += data[offset + 1]
offset += 3
record_size += developer_field_size
return (
LocalDefinition(
global_message_num=global_message_num,
endian=endian,
fields=tuple(fields),
record_size=record_size,
developer_field_size=developer_field_size,
),
offset,
)
def _collect_field_offsets(
definition: LocalDefinition, offset: int
) -> tuple[list[tuple[FieldDefinition, int]], int]:
field_offsets: list[tuple[FieldDefinition, int]] = []
current_offset = offset
for field in definition.fields:
field_offsets.append((field, current_offset))
current_offset += field.size
current_offset += definition.developer_field_size
return field_offsets, current_offset
def _read_field_value(
data: bytearray, offset: int, field: FieldDefinition, endian: str
) -> int | str:
base_type = field.base_type & 0x1F
if base_type in {0x03, 0x04, 0x0B} and field.size >= 2:
return struct.unpack_from(f"{endian}H", data, offset)[0]
if base_type in {0x05, 0x06, 0x0C} and field.size >= 4:
return struct.unpack_from(f"{endian}I", data, offset)[0]
if base_type == 0x07:
raw = bytes(data[offset : offset + field.size])
if 0 in raw:
raw = raw[: raw.index(0)]
return raw.decode("utf-8", errors="replace")
raw = bytes(data[offset : offset + field.size])
return int.from_bytes(raw, "little")
def _write_field_value(
data: bytearray,
offset: int,
field: FieldDefinition,
endian: str,
value: int | str,
) -> bool:
if isinstance(value, str):
encoded = value.encode("utf-8")
if not encoded or field.size == 0 or len(encoded) + 1 > field.size:
return False
replacement = encoded + b"\x00" + b"\x00" * (field.size - len(encoded) - 1)
if bytes(data[offset : offset + field.size]) == replacement:
return False
data[offset : offset + field.size] = replacement
return True
if field.size == 1:
replacement = struct.pack("B", value)
elif field.size == 2:
replacement = struct.pack(f"{endian}H", value)
elif field.size == 4:
replacement = struct.pack(f"{endian}I", value)
else:
return False
if bytes(data[offset : offset + field.size]) == replacement:
return False
data[offset : offset + field.size] = replacement
return True
def _rewrite_header_crc(data: bytearray) -> int | None:
header_size = data[0]
if header_size != 14:
return None
header_crc = fit_crc(data[:12])
struct.pack_into("<H", data, 12, header_crc)
return header_crc
def _rewrite_file_crc(data: bytearray) -> int:
file_crc = fit_crc(data[:-2])
struct.pack_into("<H", data, len(data) - 2, file_crc)
return file_crc

View File

@@ -0,0 +1,136 @@
from __future__ import annotations
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable, Protocol
from .config import Settings
logger = logging.getLogger(__name__)
class GarminClientProtocol(Protocol):
def login(self, tokenstore: str | None = None) -> Any:
...
def upload_activity(self, activity_path: str) -> Any:
...
@dataclass(frozen=True)
class UploadResult:
status: str
duplicate: bool
garmin_activity_id: str | None
raw_response: Any
class GarminUploadBlocked(RuntimeError):
"""Raised when Garmin login needs user action such as MFA."""
class GarminUploader:
def __init__(
self,
settings: Settings,
client_factory: Callable[..., GarminClientProtocol] | None = None,
) -> None:
self.settings = settings
self._client_factory = client_factory
self._client: GarminClientProtocol | None = None
def upload(self, fit_path: Path) -> UploadResult:
client = self._ensure_client()
try:
response = client.upload_activity(str(fit_path))
except Exception as exc:
if _looks_duplicate_error(exc):
logger.info("Garmin already has activity for %s", fit_path)
return UploadResult(
status="duplicate",
duplicate=True,
garmin_activity_id=None,
raw_response=str(exc),
)
raise
return UploadResult(
status="uploaded",
duplicate=False,
garmin_activity_id=_extract_activity_id(response),
raw_response=response,
)
def _ensure_client(self) -> GarminClientProtocol:
if self._client is not None:
return self._client
factory = self._client_factory or _default_garmin_factory
client = factory(
self.settings.garmin_email,
self.settings.garmin_password,
prompt_mfa=self._prompt_mfa,
)
try:
client.login(str(self.settings.garmin_tokenstore))
except RuntimeError:
raise
except Exception as exc:
if "mfa" in str(exc).lower():
raise GarminUploadBlocked(
"Garmin MFA is required. Set GARMIN_MFA_CODE for one run."
) from exc
raise
self._client = client
return client
def _prompt_mfa(self) -> str:
if self.settings.garmin_mfa_code:
return self.settings.garmin_mfa_code
raise GarminUploadBlocked(
"Garmin requested MFA but GARMIN_MFA_CODE is not set."
)
def _default_garmin_factory(*args: Any, **kwargs: Any) -> GarminClientProtocol:
from garminconnect import Garmin
return Garmin(*args, **kwargs)
def _looks_duplicate_error(exc: Exception) -> bool:
text = str(exc).lower()
return any(token in text for token in ("duplicate", "already exists", "409"))
def _extract_activity_id(response: Any) -> str | None:
if not isinstance(response, dict):
return None
candidates = [
response.get("activityId"),
response.get("activity_id"),
response.get("id"),
]
detailed_import = response.get("detailedImportResult")
if isinstance(detailed_import, dict):
candidates.extend(
[
detailed_import.get("uploadId"),
detailed_import.get("activityId"),
]
)
for key in ("successes", "success", "importedActivities"):
items = response.get(key)
if isinstance(items, list) and items:
first = items[0]
if isinstance(first, dict):
candidates.extend([first.get("activityId"), first.get("id")])
for candidate in candidates:
if candidate is not None:
return str(candidate)
return None

View File

@@ -0,0 +1,11 @@
from __future__ import annotations
import logging
def setup_logging(level: str = "INFO") -> None:
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)

View File

@@ -0,0 +1,21 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
@dataclass(frozen=True)
class DownloadCandidate:
source_ref: str
title: str
href: str | None
element_index: int
click_selector: str
@dataclass(frozen=True)
class DownloadedActivity:
source_ref: str
title: str
url: str | None
raw_path: Path

View File

@@ -0,0 +1,759 @@
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import re
from datetime import UTC, datetime
from pathlib import Path
from typing import Callable
from urllib.parse import urljoin, urlparse
from playwright.async_api import BrowserContext, Page, TimeoutError, async_playwright
from .config import Settings
from .fit_device import is_fit_file
from .models import DownloadCandidate, DownloadedActivity
logger = logging.getLogger(__name__)
COOKIE_ACCEPT_TEXT_PATTERN = (
r"^(accept( all)?( cookies?)?|allow all( cookies?)?|agree|i agree|got it|ok|okay)$"
)
COOKIE_ACCEPT_TEXT_RE = re.compile(COOKIE_ACCEPT_TEXT_PATTERN, re.I)
COOKIE_ACCEPT_SELECTORS = (
"#onetrust-accept-btn-handler",
"button#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll",
"button.cky-btn-accept",
"button[id*='accept'][id*='cookie' i]",
"button[id*='cookie'][id*='accept' i]",
"button[class*='accept'][class*='cookie' i]",
"button[class*='cookie'][class*='accept' i]",
"button[data-testid*='accept' i]",
"button[data-test*='accept' i]",
"[role='button'][aria-label*='accept' i]",
)
COOKIE_ACCEPT_JS_PATTERN = (
r"^(accept( all)?( cookies?)?|allow all( cookies?)?|agree|i agree|got it|ok|okay)$"
)
CHALLENGE_TEXT_TOKENS = (
"captcha",
"recaptcha",
"hcaptcha",
"turnstile",
"not a robot",
"i'm not a robot",
"i am not a robot",
"verify you are human",
"verify that you are human",
"checking your browser",
"security check",
"cloudflare",
"cf-challenge",
"cf-turnstile",
)
CHALLENGE_URL_TOKENS = (
"captcha",
"recaptcha",
"hcaptcha",
"turnstile",
"challenge",
"cloudflare",
"verify",
)
class MyWhooshCrawler:
def __init__(self, settings: Settings) -> None:
self.settings = settings
async def download_new_activities(
self, should_skip_source: Callable[[str], bool]
) -> list[DownloadedActivity]:
timeout_ms = self.settings.mywhoosh_timeout_seconds * 1000
async with async_playwright() as playwright:
context = await playwright.chromium.launch_persistent_context(
user_data_dir=str(self.settings.browser_state_dir),
headless=self.settings.mywhoosh_headless,
accept_downloads=True,
args=["--no-sandbox", "--disable-dev-shm-usage"],
slow_mo=self.settings.mywhoosh_slow_mo_ms,
timeout=timeout_ms,
viewport={"width": 1440, "height": 1000},
)
page: Page | None = None
try:
await self._restore_auth_state(context)
page = context.pages[0] if context.pages else await context.new_page()
page.set_default_timeout(timeout_ms)
await self._ensure_logged_in(page)
candidates = await self._find_candidates(page)
logger.info("Found %d possible MyWhoosh download links", len(candidates))
downloaded: list[DownloadedActivity] = []
for candidate in candidates:
if len(downloaded) >= self.settings.mywhoosh_max_downloads_per_run:
break
if should_skip_source(candidate.source_ref):
logger.debug("Skipping already terminal source %s", candidate.source_ref)
continue
try:
activity = await self._download_candidate(
context, page, candidate
)
except Exception:
logger.exception(
"Failed downloading MyWhoosh candidate %s",
candidate.source_ref,
)
continue
if activity is not None:
downloaded.append(activity)
return downloaded
finally:
await self._save_auth_state(context)
await context.close()
async def _ensure_logged_in(self, page: Page) -> None:
await self._goto(page, self.settings.mywhoosh_activity_url)
await self._dismiss_cookie_banner(page)
await self._handle_challenge_if_needed(page, "activity-before-login")
await self._debug_screenshot(page, "activity-before-login")
if not await self._login_form_visible(page):
return
logger.info("Logging in to MyWhoosh")
await self._goto(page, self.settings.mywhoosh_login_url)
await self._dismiss_cookie_banner(page)
await self._handle_challenge_if_needed(page, "login-page")
await self._debug_screenshot(page, "login-page")
email_selector = (
'input[type="email"], input[name*="email" i], input[name*="user" i], '
'input[autocomplete="username"]'
)
password_selector = 'input[type="password"], input[autocomplete="current-password"]'
try:
await page.locator(email_selector).first.fill(self.settings.mywhoosh_email)
await page.locator(password_selector).first.fill(
self.settings.mywhoosh_password
)
await self._dismiss_cookie_banner(page)
submit = page.locator(
'button[type="submit"], input[type="submit"], button:has-text("Login"), '
'button:has-text("Log in"), button:has-text("Sign in")'
).first
await submit.click()
except TimeoutError:
if self.settings.mywhoosh_manual_login_wait_seconds <= 0:
raise
logger.warning(
"Could not complete automatic MyWhoosh login. Waiting for manual login."
)
await self._wait_for_manual_login(page)
return
await self._wait_for_quiet(page)
await self._dismiss_cookie_banner(page)
await self._handle_challenge_if_needed(page, "after-login-submit")
await self._debug_screenshot(page, "after-login-submit")
if self.settings.mywhoosh_manual_login_wait_seconds > 0:
await self._wait_for_manual_login(page)
await self._goto(page, self.settings.mywhoosh_activity_url)
await self._dismiss_cookie_banner(page)
await self._handle_challenge_if_needed(page, "activity-after-login")
if await self._login_form_visible(page):
if self.settings.mywhoosh_manual_login_wait_seconds > 0:
await self._wait_for_manual_login(page)
await self._goto(page, self.settings.mywhoosh_activity_url)
await self._dismiss_cookie_banner(page)
await self._handle_challenge_if_needed(page, "activity-after-manual-login")
if not await self._login_form_visible(page):
return
raise RuntimeError("MyWhoosh login did not complete; check credentials or MFA")
async def _find_candidates(self, page: Page) -> list[DownloadCandidate]:
await self._goto(page, self.settings.mywhoosh_activity_url)
await self._dismiss_cookie_banner(page)
await self._handle_challenge_if_needed(page, "activity-download-scan")
await self._open_activities_view(page)
await self._debug_screenshot(page, "activity-download-scan")
hints = self.settings.mywhoosh_download_text_hints
raw_items = await page.evaluate(
"""
({ hints, downloadSelector }) => {
const safeQueryAll = (selector) => {
if (!selector) {
return [];
}
try {
return Array.from(document.querySelectorAll(selector));
} catch {
return [];
}
};
const safeMatches = (el, selector) => {
if (!selector) {
return false;
}
try {
return el.matches(selector);
} catch {
return false;
}
};
const priorityElements = safeQueryAll(downloadSelector);
const generalElements = Array.from(
document.querySelectorAll('a, button, [role="button"]')
);
const elements = [];
const seen = new Set();
for (const el of [...priorityElements, ...generalElements]) {
if (seen.has(el)) {
continue;
}
seen.add(el);
elements.push(el);
}
return elements.map((el, index) => {
const marker = `mywhoosh-sync-${index}`;
el.setAttribute('data-mywhoosh-sync-index', marker);
const style = window.getComputedStyle(el);
const rect = el.getBoundingClientRect();
const visible = style.visibility !== 'hidden'
&& style.display !== 'none'
&& rect.width > 0
&& rect.height > 0;
const href = el.href || el.getAttribute('href') || null;
const text = (el.innerText || el.textContent || '').trim();
const aria = el.getAttribute('aria-label') || '';
const download = el.getAttribute('download') || '';
const className = typeof el.className === 'string' ? el.className : '';
const row = el.closest(
'tr, li, article, [role="row"], .activity, .activity-row, '
+ '.activity-card, .ride, .ride-row, .workout, .card'
);
const rowText = row ? (row.innerText || row.textContent || '').trim() : '';
const matchesDownloadSelector = safeMatches(el, downloadSelector);
const haystack = `${href || ''} ${text} ${aria} ${download} ${className} ${rowText}`.toLowerCase();
return {
index,
href,
text,
aria,
download,
className,
rowText,
matchesDownloadSelector,
visible,
haystack,
clickSelector: `[data-mywhoosh-sync-index="${marker}"]`,
};
}).filter((item) => {
return item.visible
&& (
item.matchesDownloadSelector
|| hints.some((hint) => item.haystack.includes(hint))
);
});
}
""",
{
"hints": hints,
"downloadSelector": self.settings.mywhoosh_download_button_selector,
},
)
first_selector_match = next(
(item for item in raw_items if item.get("matchesDownloadSelector")), None
)
if first_selector_match is not None:
raw_items = [first_selector_match]
candidates: list[DownloadCandidate] = []
seen: set[str] = set()
for item in raw_items:
if _looks_like_app_download(item.get("haystack", "")):
continue
href = item.get("href")
text = item.get("text") or item.get("aria") or item.get("download") or "activity"
absolute_href = (
urljoin(self.settings.mywhoosh_activity_url, href)
if href and not href.startswith("javascript:")
else None
)
row_text = item.get("rowText") or ""
source_ref = _source_ref(absolute_href, text, item["index"], row_text)
if source_ref in seen:
continue
seen.add(source_ref)
candidates.append(
DownloadCandidate(
source_ref=source_ref,
title=_clean_title(row_text or text),
href=absolute_href,
element_index=item["index"],
click_selector=item["clickSelector"],
)
)
return candidates
async def _download_candidate(
self, context: BrowserContext, page: Page, candidate: DownloadCandidate
) -> DownloadedActivity | None:
if candidate.href:
downloaded = await self._download_direct(context, candidate)
if downloaded is not None:
return downloaded
locator = page.locator(candidate.click_selector).first
try:
async with page.expect_download(
timeout=self.settings.mywhoosh_timeout_seconds * 1000
) as download_info:
await locator.click()
download = await download_info.value
suggested = download.suggested_filename or f"{candidate.source_ref}.fit"
raw_path = self._raw_output_path(candidate, suggested)
await download.save_as(raw_path)
except TimeoutError:
logger.warning("Clicking candidate did not produce a download: %s", candidate)
return None
if not is_fit_file(raw_path):
raw_path.unlink(missing_ok=True)
logger.warning("Downloaded file was not a valid FIT file: %s", raw_path)
return None
return DownloadedActivity(
source_ref=candidate.source_ref,
title=candidate.title,
url=candidate.href,
raw_path=raw_path,
)
async def _download_direct(
self, context: BrowserContext, candidate: DownloadCandidate
) -> DownloadedActivity | None:
if candidate.href is None:
return None
response = await context.request.get(candidate.href)
if not response.ok:
logger.debug("Direct download failed for %s: %s", candidate.href, response.status)
return None
body = await response.body()
if len(body) < 14 or body[8:12] != b".FIT":
return None
filename = _filename_from_headers(
response.headers.get("content-disposition", "")
) or _filename_from_url(candidate.href) or f"{candidate.source_ref}.fit"
raw_path = self._raw_output_path(candidate, filename)
raw_path.write_bytes(body)
if not is_fit_file(raw_path):
raw_path.unlink(missing_ok=True)
logger.warning("Direct response was FIT-like but invalid: %s", candidate.href)
return None
return DownloadedActivity(
source_ref=candidate.source_ref,
title=candidate.title,
url=candidate.href,
raw_path=raw_path,
)
def _raw_output_path(self, candidate: DownloadCandidate, filename: str) -> Path:
safe_name = _safe_filename(filename)
if not safe_name.lower().endswith(".fit"):
safe_name = f"{safe_name}.fit"
return self.settings.raw_dir / f"{candidate.source_ref}_{safe_name}"
async def _goto(self, page: Page, url: str) -> None:
await page.goto(url, wait_until="domcontentloaded")
await self._wait_for_quiet(page)
async def _wait_for_quiet(self, page: Page) -> None:
try:
await page.wait_for_load_state("networkidle", timeout=10_000)
except TimeoutError:
await asyncio.sleep(1)
async def _restore_auth_state(self, context: BrowserContext) -> None:
path = self.settings.mywhoosh_auth_state_path
if not path.exists():
return
try:
state = json.loads(path.read_text(encoding="utf-8"))
cookies = state.get("cookies") or []
origins = state.get("origins") or []
if cookies:
await context.add_cookies(cookies)
if origins:
await context.add_init_script(_storage_restore_script(origins))
logger.info("Restored MyWhoosh auth state from %s", path)
except Exception:
logger.exception("Failed restoring MyWhoosh auth state from %s", path)
async def _save_auth_state(self, context: BrowserContext) -> None:
path = self.settings.mywhoosh_auth_state_path
try:
state = await context.storage_state()
state.setdefault("origins", [])
for browser_page in context.pages:
if browser_page.is_closed():
continue
await self._capture_page_storage(browser_page, state)
path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = path.with_suffix(f"{path.suffix}.tmp")
tmp_path.write_text(json.dumps(state, indent=2), encoding="utf-8")
tmp_path.replace(path)
logger.info("Saved MyWhoosh auth state to %s", path)
except Exception:
logger.exception("Failed saving MyWhoosh auth state to %s", path)
async def _capture_page_storage(self, page: Page, state: dict) -> None:
try:
storage = await page.evaluate(
"""
() => {
const entries = (storage) => {
const items = [];
for (let index = 0; index < storage.length; index += 1) {
const name = storage.key(index);
items.push({ name, value: storage.getItem(name) });
}
return items;
};
return {
origin: window.location.origin,
localStorage: entries(window.localStorage),
sessionStorage: entries(window.sessionStorage),
};
}
"""
)
except Exception:
return
origin = storage.get("origin")
if not origin or origin == "null":
return
origins = state.setdefault("origins", [])
existing = next(
(item for item in origins if item.get("origin") == origin),
None,
)
if existing is None:
origins.append(storage)
return
existing["localStorage"] = storage.get("localStorage", [])
existing["sessionStorage"] = storage.get("sessionStorage", [])
async def _login_form_visible(self, page: Page) -> bool:
try:
return await page.locator('input[type="password"]').first.is_visible()
except Exception:
return False
async def _open_activities_view(self, page: Page) -> None:
label = self.settings.mywhoosh_activities_button_text.strip()
if not label:
return
label_re = re.compile(rf"^\s*{re.escape(label)}\s*$", re.I)
locators = (
page.get_by_role("button", name=label_re),
page.get_by_role("link", name=label_re),
page.get_by_role("tab", name=label_re),
page.locator("a, button, [role='button'], [role='tab']").filter(
has_text=label_re
),
)
for locator in locators:
if await self._click_first_visible(locator):
logger.info("Opened MyWhoosh %s view", label)
await self._wait_for_quiet(page)
await self._dismiss_cookie_banner(page)
await self._handle_challenge_if_needed(page, "after-activities-click")
await self._debug_screenshot(page, "after-activities-click")
return
logger.debug("No visible MyWhoosh %s button found", label)
async def _dismiss_cookie_banner(self, page: Page) -> None:
clicked = False
for _ in range(3):
clicked_this_round = False
for selector in COOKIE_ACCEPT_SELECTORS:
if await self._click_first_visible(page.locator(selector)):
clicked = True
clicked_this_round = True
break
if not clicked_this_round and await self._click_first_visible(
page.get_by_role("button", name=COOKIE_ACCEPT_TEXT_RE)
):
clicked = True
clicked_this_round = True
if not clicked_this_round and await self._click_cookie_banner_by_script(page):
clicked = True
clicked_this_round = True
if not clicked_this_round:
break
await asyncio.sleep(0.5)
if clicked:
logger.info("Accepted MyWhoosh cookie consent")
await self._wait_for_quiet(page)
async def _click_first_visible(self, locator) -> bool:
try:
count = await locator.count()
except Exception:
count = 1
for index in range(min(count, 5)):
try:
item = locator.nth(index)
if await item.is_visible(timeout=500):
await item.click(timeout=1500)
return True
except Exception:
continue
return False
async def _click_cookie_banner_by_script(self, page: Page) -> bool:
try:
return bool(
await page.evaluate(
"""
(pattern) => {
const re = new RegExp(pattern, 'i');
const candidates = Array.from(document.querySelectorAll(
'button, [role="button"], input[type="button"], input[type="submit"]'
));
for (const element of candidates) {
const style = window.getComputedStyle(element);
const rect = element.getBoundingClientRect();
const visible = style.visibility !== 'hidden'
&& style.display !== 'none'
&& rect.width > 0
&& rect.height > 0;
if (!visible) {
continue;
}
const label = (
element.innerText
|| element.value
|| element.getAttribute('aria-label')
|| ''
).replace(/\\s+/g, ' ').trim().toLowerCase();
if (re.test(label)) {
element.click();
return true;
}
}
return false;
}
""",
COOKIE_ACCEPT_JS_PATTERN,
)
)
except Exception:
return False
async def _handle_challenge_if_needed(self, page: Page, stage: str) -> None:
if not await self._challenge_visible(page):
return
await self._debug_screenshot(page, f"challenge-{_safe_debug_name(stage)}")
if self.settings.mywhoosh_manual_login_wait_seconds <= 0:
raise RuntimeError(
"MyWhoosh presented a captcha/bot challenge. Run "
"`docker compose -f docker-compose.debug.yml up --build`, open noVNC, "
"solve the challenge manually, then rerun the normal service."
)
logger.warning(
"MyWhoosh presented a captcha/bot challenge at %s. Waiting for manual solve.",
stage,
)
await self._wait_for_manual_login(page)
async def _challenge_visible(self, page: Page) -> bool:
if _looks_like_challenge_url(page.url):
return True
try:
challenge_signal = await page.evaluate(
"""
() => {
const frameText = Array.from(document.querySelectorAll('iframe'))
.map((frame) => [
frame.getAttribute('src') || '',
frame.getAttribute('title') || '',
frame.getAttribute('name') || '',
frame.getAttribute('id') || ''
].join(' '))
.join('\\n');
return [
document.title || '',
document.body?.innerText || '',
frameText
].join('\\n');
}
"""
)
except Exception:
return False
return _looks_like_challenge_text(challenge_signal)
async def _wait_for_manual_login(self, page: Page) -> None:
seconds = self.settings.mywhoosh_manual_login_wait_seconds
logger.warning(
"Manual MyWhoosh login window is open for %d seconds. "
"Use the visible browser to accept cookies, solve challenges, and finish login.",
seconds,
)
await self._debug_screenshot(page, "manual-login-start")
deadline = asyncio.get_running_loop().time() + seconds
while asyncio.get_running_loop().time() < deadline:
if await self._manual_login_complete(page):
await self._wait_for_quiet(page)
logger.info("Manual MyWhoosh login appears complete")
await self._debug_screenshot(page, "manual-login-complete")
return
await asyncio.sleep(2)
raise RuntimeError("Timed out waiting for manual MyWhoosh login")
async def _manual_login_complete(self, page: Page) -> bool:
if await self._login_form_visible(page):
return False
if await self._challenge_visible(page):
return False
current_url = page.url.lower()
blocking_url_tokens = ("login", "signin", "sign-in", "auth", "captcha", "challenge")
return not any(token in current_url for token in blocking_url_tokens)
async def _debug_screenshot(self, page: Page, name: str) -> None:
if not self.settings.mywhoosh_debug_screenshots:
return
timestamp = datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ")
path = self.settings.mywhoosh_debug_dir / f"{timestamp}_{name}.png"
try:
await page.screenshot(path=str(path), full_page=True)
logger.debug("Saved debug screenshot %s", path)
except Exception:
logger.exception("Failed saving debug screenshot %s", path)
def _source_ref(href: str | None, text: str, index: int, row_text: str = "") -> str:
stable = href or f"{row_text or text}|{index}"
return hashlib.sha256(stable.encode("utf-8")).hexdigest()[:24]
def _storage_restore_script(origins: list[dict]) -> str:
origins_json = json.dumps(origins)
return f"""
(() => {{
const origins = {origins_json};
const originState = origins.find((item) => item.origin === window.location.origin);
if (!originState) {{
return;
}}
const restore = (storage, entries) => {{
for (const entry of entries || []) {{
if (entry && entry.name !== null && entry.value !== null) {{
storage.setItem(entry.name, entry.value);
}}
}}
}};
try {{
restore(window.localStorage, originState.localStorage);
restore(window.sessionStorage, originState.sessionStorage);
}} catch {{
// Ignore blocked storage access. The persisted Chromium profile is still available.
}}
}})();
"""
def _clean_title(value: str) -> str:
value = re.sub(r"\s+", " ", value).strip()
return value[:120] or "MyWhoosh activity"
def _looks_like_cookie_accept_label(value: str) -> bool:
normalized = re.sub(r"\s+", " ", value).strip().lower()
return bool(COOKIE_ACCEPT_TEXT_RE.match(normalized))
def _looks_like_challenge_text(value: str) -> bool:
normalized = _normalize_challenge_signal(value)
return any(token in normalized for token in CHALLENGE_TEXT_TOKENS)
def _looks_like_challenge_url(url: str) -> bool:
normalized = _normalize_challenge_signal(url)
return any(token in normalized for token in CHALLENGE_URL_TOKENS)
def _normalize_challenge_signal(value: str) -> str:
value = value.replace("\u2019", "'")
return re.sub(r"\s+", " ", value).strip().lower()
def _looks_like_app_download(haystack: str) -> bool:
return any(
token in haystack
for token in (
"download app",
"app store",
"google play",
"play.google.com",
"apps.apple.com",
"windows",
"macos",
"android",
"ios",
)
)
def _safe_filename(value: str) -> str:
value = _clean_title(value)
value = re.sub(r"[^A-Za-z0-9._-]+", "_", value).strip("._")
return value or "activity.fit"
def _safe_debug_name(value: str) -> str:
value = re.sub(r"[^A-Za-z0-9._-]+", "-", value).strip("-")
return value or "page"
def _filename_from_headers(content_disposition: str) -> str | None:
match = re.search(r'filename\*?=(?:UTF-8\'\')?"?([^";]+)"?', content_disposition)
if not match:
return None
return match.group(1)
def _filename_from_url(url: str | None) -> str | None:
if not url:
return None
name = Path(urlparse(url).path).name
return name or None

View File

@@ -0,0 +1,133 @@
from __future__ import annotations
import asyncio
import logging
import signal
from pathlib import Path
from .config import Settings
from .fit_device import GarminDevice, convert_fit_device
from .garmin import GarminUploadBlocked, GarminUploader
from .models import DownloadedActivity
from .mywhoosh import MyWhooshCrawler
from .state import ActivityStore, sha256_file
logger = logging.getLogger(__name__)
class SyncService:
def __init__(
self,
settings: Settings,
crawler: MyWhooshCrawler | None = None,
uploader: GarminUploader | None = None,
store: ActivityStore | None = None,
) -> None:
self.settings = settings
self.crawler = crawler or MyWhooshCrawler(settings)
self.uploader = uploader or GarminUploader(settings)
self.store = store or ActivityStore(settings.db_path)
self.device = GarminDevice(
manufacturer_id=settings.target_garmin_manufacturer_id,
product_id=settings.target_garmin_product_id,
product_name=settings.target_garmin_product_name,
serial_number=settings.target_garmin_serial_number,
)
async def serve(self) -> None:
stop_event = asyncio.Event()
try:
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, stop_event.set)
except (NotImplementedError, RuntimeError):
pass
logger.info(
"Starting sync loop; interval=%ss dry_run=%s",
self.settings.poll_interval_seconds,
self.settings.dry_run,
)
while not stop_event.is_set():
try:
await self.run_once()
except Exception:
logger.exception("Sync cycle failed")
try:
await asyncio.wait_for(
stop_event.wait(), timeout=self.settings.poll_interval_seconds
)
except TimeoutError:
continue
async def run_once(self) -> None:
self.store.initialize()
downloads = await self.crawler.download_new_activities(
self.store.is_terminal_source
)
logger.info("Downloaded %d candidate activities", len(downloads))
for activity in downloads:
await asyncio.to_thread(self._process_activity, activity)
def _process_activity(self, activity: DownloadedActivity) -> None:
try:
raw_hash = sha256_file(activity.raw_path)
self.store.record_downloaded(
source_ref=activity.source_ref,
title=activity.title,
source_url=activity.url,
raw_path=activity.raw_path,
raw_sha256=raw_hash,
)
if self.store.is_uploaded_hash(raw_hash):
self.store.mark_duplicate(
activity.source_ref,
"Raw file hash was already uploaded from another source.",
)
return
converted_path = self._converted_path(activity)
result = convert_fit_device(activity.raw_path, converted_path, self.device)
converted_hash = sha256_file(converted_path)
self.store.mark_converted(
activity.source_ref,
converted_path,
converted_hash,
result.patched_field_count,
)
if self.store.is_uploaded_hash(converted_hash):
self.store.mark_duplicate(
activity.source_ref,
"Converted file hash was already uploaded from another source.",
)
return
if self.settings.dry_run:
logger.info("Dry run enabled; not uploading %s", converted_path)
return
upload = self.uploader.upload(converted_path)
if upload.duplicate:
self.store.mark_duplicate(
activity.source_ref, "Garmin reported a duplicate activity."
)
else:
self.store.mark_uploaded(
activity.source_ref,
garmin_activity_id=upload.garmin_activity_id,
)
except GarminUploadBlocked as exc:
logger.error("Upload blocked: %s", exc)
self.store.mark_failed(activity.source_ref, str(exc))
except Exception as exc:
logger.exception("Failed processing %s", activity.raw_path)
self.store.mark_failed(activity.source_ref, str(exc))
def _converted_path(self, activity: DownloadedActivity) -> Path:
return self.settings.converted_dir / activity.raw_path.name

View File

@@ -0,0 +1,219 @@
from __future__ import annotations
import hashlib
import sqlite3
from contextlib import contextmanager
from datetime import UTC, datetime
from pathlib import Path
from typing import Iterator
TERMINAL_STATUSES = {"uploaded", "duplicate"}
def utc_now() -> str:
return datetime.now(UTC).isoformat(timespec="seconds")
def sha256_file(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
class ActivityStore:
def __init__(self, db_path: Path) -> None:
self.db_path = db_path
def initialize(self) -> None:
self.db_path.parent.mkdir(parents=True, exist_ok=True)
with self._connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS activities (
source_ref TEXT PRIMARY KEY,
title TEXT,
source_url TEXT,
raw_path TEXT,
converted_path TEXT,
raw_sha256 TEXT,
converted_sha256 TEXT,
status TEXT NOT NULL,
error_message TEXT,
attempts INTEGER NOT NULL DEFAULT 0,
patched_field_count INTEGER,
garmin_activity_id TEXT,
first_seen_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
uploaded_at TEXT
)
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_activities_raw_sha ON activities(raw_sha256)"
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_activities_converted_sha
ON activities(converted_sha256)
"""
)
def is_terminal_source(self, source_ref: str) -> bool:
row = self._fetch_one(
"SELECT status FROM activities WHERE source_ref = ?", (source_ref,)
)
return bool(row and row["status"] in TERMINAL_STATUSES)
def is_uploaded_hash(self, file_hash: str) -> bool:
row = self._fetch_one(
"""
SELECT source_ref FROM activities
WHERE status IN ('uploaded', 'duplicate')
AND (raw_sha256 = ? OR converted_sha256 = ?)
LIMIT 1
""",
(file_hash, file_hash),
)
return row is not None
def record_downloaded(
self,
source_ref: str,
title: str,
source_url: str | None,
raw_path: Path,
raw_sha256: str,
) -> None:
now = utc_now()
with self._connect() as conn:
conn.execute(
"""
INSERT INTO activities (
source_ref, title, source_url, raw_path, raw_sha256, status,
first_seen_at, updated_at
)
VALUES (?, ?, ?, ?, ?, 'downloaded', ?, ?)
ON CONFLICT(source_ref) DO UPDATE SET
title = excluded.title,
source_url = excluded.source_url,
raw_path = excluded.raw_path,
raw_sha256 = excluded.raw_sha256,
status = CASE
WHEN activities.status IN ('uploaded', 'duplicate')
THEN activities.status
ELSE 'downloaded'
END,
error_message = NULL,
updated_at = excluded.updated_at
""",
(
source_ref,
title,
source_url,
str(raw_path),
raw_sha256,
now,
now,
),
)
def mark_converted(
self,
source_ref: str,
converted_path: Path,
converted_sha256: str,
patched_field_count: int,
) -> None:
self._execute(
"""
UPDATE activities
SET converted_path = ?,
converted_sha256 = ?,
patched_field_count = ?,
status = CASE
WHEN status IN ('uploaded', 'duplicate') THEN status
ELSE 'converted'
END,
error_message = NULL,
updated_at = ?
WHERE source_ref = ?
""",
(
str(converted_path),
converted_sha256,
patched_field_count,
utc_now(),
source_ref,
),
)
def mark_uploaded(self, source_ref: str, garmin_activity_id: str | None) -> None:
now = utc_now()
self._execute(
"""
UPDATE activities
SET status = 'uploaded',
garmin_activity_id = ?,
error_message = NULL,
uploaded_at = ?,
updated_at = ?
WHERE source_ref = ?
""",
(garmin_activity_id, now, now, source_ref),
)
def mark_duplicate(self, source_ref: str, message: str) -> None:
now = utc_now()
self._execute(
"""
UPDATE activities
SET status = 'duplicate',
error_message = ?,
uploaded_at = COALESCE(uploaded_at, ?),
updated_at = ?
WHERE source_ref = ?
""",
(message, now, now, source_ref),
)
def mark_failed(self, source_ref: str, message: str) -> None:
self._execute(
"""
UPDATE activities
SET status = 'failed',
error_message = ?,
attempts = attempts + 1,
updated_at = ?
WHERE source_ref = ?
""",
(message[:1000], utc_now(), source_ref),
)
def get_status(self, source_ref: str) -> str | None:
row = self._fetch_one(
"SELECT status FROM activities WHERE source_ref = ?", (source_ref,)
)
return row["status"] if row else None
@contextmanager
def _connect(self) -> Iterator[sqlite3.Connection]:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
finally:
conn.close()
def _execute(self, sql: str, params: tuple[object, ...]) -> None:
with self._connect() as conn:
conn.execute(sql, params)
def _fetch_one(
self, sql: str, params: tuple[object, ...]
) -> sqlite3.Row | None:
with self._connect() as conn:
return conn.execute(sql, params).fetchone()

45
tests/conftest.py Normal file
View File

@@ -0,0 +1,45 @@
from pathlib import Path
import pytest
from mywhoosh_garmin_sync.config import Settings
@pytest.fixture
def settings(tmp_path: Path) -> Settings:
return Settings(
mywhoosh_email="whoosh@example.com",
mywhoosh_password="whoosh-pass",
garmin_email="garmin@example.com",
garmin_password="garmin-pass",
poll_interval_seconds=3600,
data_dir=tmp_path,
raw_dir=tmp_path / "raw",
converted_dir=tmp_path / "converted",
browser_state_dir=tmp_path / "browser",
mywhoosh_auth_state_path=tmp_path / "mywhoosh_auth_state.json",
garmin_tokenstore=tmp_path / "garmin_tokens",
db_path=tmp_path / "state.sqlite3",
log_level="INFO",
dry_run=False,
dashboard_enabled=True,
dashboard_bind="127.0.0.1",
dashboard_port=8080,
mywhoosh_login_url="https://www.mywhoosh.com/login/",
mywhoosh_activity_url="https://www.mywhoosh.com/profile/",
mywhoosh_headless=True,
mywhoosh_timeout_seconds=1,
mywhoosh_max_downloads_per_run=10,
mywhoosh_download_text_hints=["fit", "download"],
mywhoosh_activities_button_text="ACTIVITIES",
mywhoosh_download_button_selector=".btnDownload",
mywhoosh_slow_mo_ms=0,
mywhoosh_manual_login_wait_seconds=0,
mywhoosh_debug_screenshots=False,
mywhoosh_debug_dir=tmp_path / "debug",
garmin_mfa_code=None,
target_garmin_manufacturer_id=1,
target_garmin_product_id=3578,
target_garmin_product_name="Edge 1030 Plus",
target_garmin_serial_number=None,
)

35
tests/test_config.py Normal file
View File

@@ -0,0 +1,35 @@
from pathlib import Path
from mywhoosh_garmin_sync.config import Settings
def test_settings_from_env(monkeypatch, tmp_path):
monkeypatch.setenv("MYWHOOSH_EMAIL", "whoosh@example.com")
monkeypatch.setenv("MYWHOOSH_PASSWORD", "whoosh-pass")
monkeypatch.setenv("GARMIN_EMAIL", "garmin@example.com")
monkeypatch.setenv("GARMIN_PASSWORD", "garmin-pass")
monkeypatch.setenv("DATA_DIR", str(tmp_path))
monkeypatch.setenv("DRY_RUN", "true")
monkeypatch.setenv("POLL_INTERVAL_SECONDS", "123")
monkeypatch.setenv("TARGET_GARMIN_SERIAL_NUMBER", "123456")
monkeypatch.setenv("MYWHOOSH_MANUAL_LOGIN_WAIT_SECONDS", "900")
settings = Settings.from_env()
assert settings.data_dir == tmp_path
assert settings.raw_dir == tmp_path / "raw"
assert settings.dry_run is True
assert settings.poll_interval_seconds == 123
assert settings.dashboard_enabled is True
assert settings.dashboard_bind == "0.0.0.0"
assert settings.dashboard_port == 8080
assert settings.target_garmin_product_id == 3578
assert settings.target_garmin_serial_number == 123456
assert settings.mywhoosh_manual_login_wait_seconds == 900
assert settings.mywhoosh_activities_button_text == "ACTIVITIES"
assert settings.mywhoosh_download_button_selector == ".btnDownload"
assert settings.mywhoosh_auth_state_path == tmp_path / "mywhoosh_auth_state.json"
settings.ensure_directories()
assert Path(settings.raw_dir).is_dir()
assert Path(settings.mywhoosh_debug_dir).is_dir()

119
tests/test_fit_device.py Normal file
View File

@@ -0,0 +1,119 @@
from pathlib import Path
import struct
import pytest
from mywhoosh_garmin_sync.fit_crc import fit_crc
from mywhoosh_garmin_sync.fit_device import (
DeviceFieldValue,
FitFormatError,
GarminDevice,
convert_fit_device,
read_device_field_values,
)
def test_convert_fit_device_patches_metadata_and_crc(tmp_path: Path):
source = tmp_path / "source.fit"
output = tmp_path / "output.fit"
source.write_bytes(_minimal_activity_fit())
result = convert_fit_device(
source,
output,
GarminDevice(product_id=3578, product_name="Edge 1030 Plus", serial_number=42),
)
assert result.patched_field_count == 8
values = read_device_field_values(output)
assert DeviceFieldValue(0, 1, 1) in values
assert DeviceFieldValue(0, 2, 3578) in values
assert DeviceFieldValue(0, 3, 42) in values
assert DeviceFieldValue(0, 8, "Edge 1030 Plus") in values
assert DeviceFieldValue(23, 2, 1) in values
assert DeviceFieldValue(23, 3, 42) in values
assert DeviceFieldValue(23, 4, 3578) in values
assert DeviceFieldValue(23, 27, "Edge 1030 Plus") in values
data = output.read_bytes()
assert struct.unpack_from("<H", data, 12)[0] == fit_crc(data[:12])
assert struct.unpack_from("<H", data, len(data) - 2)[0] == fit_crc(data[:-2])
def test_convert_fit_device_rejects_bad_crc(tmp_path: Path):
source = tmp_path / "bad.fit"
output = tmp_path / "output.fit"
data = bytearray(_minimal_activity_fit())
data[-1] ^= 0xFF
source.write_bytes(data)
with pytest.raises(FitFormatError):
convert_fit_device(source, output)
def _minimal_activity_fit() -> bytes:
data = bytearray()
data.extend(
_definition(
local=0,
global_message=0,
fields=[
(0, 1, 0x00),
(1, 2, 0x84),
(2, 2, 0x84),
(3, 4, 0x8C),
(8, 20, 0x07),
],
)
)
data.extend(b"\x00")
data.extend(struct.pack("<BHHI", 4, 32, 40, 999))
data.extend(_fit_string("MyWhoosh", 20))
data.extend(
_definition(
local=1,
global_message=23,
fields=[
(2, 2, 0x84),
(3, 4, 0x8C),
(4, 2, 0x84),
(27, 20, 0x07),
],
)
)
data.extend(b"\x01")
data.extend(struct.pack("<HIH", 32, 999, 40))
data.extend(_fit_string("Trainer", 20))
header = bytearray(14)
header[0] = 14
header[1] = 0x10
struct.pack_into("<H", header, 2, 0x08)
struct.pack_into("<I", header, 4, len(data))
header[8:12] = b".FIT"
struct.pack_into("<H", header, 12, fit_crc(header[:12]))
fit_file = header + data + b"\x00\x00"
struct.pack_into("<H", fit_file, len(fit_file) - 2, fit_crc(fit_file[:-2]))
return bytes(fit_file)
def _definition(
local: int, global_message: int, fields: list[tuple[int, int, int]]
) -> bytes:
result = bytearray()
result.append(0x40 | local)
result.append(0)
result.append(0)
result.extend(struct.pack("<H", global_message))
result.append(len(fields))
for field_num, size, base_type in fields:
result.extend(bytes([field_num, size, base_type]))
return bytes(result)
def _fit_string(value: str, size: int) -> bytes:
encoded = value.encode("utf-8")[: size - 1] + b"\x00"
return encoded + b"\x00" * (size - len(encoded))

59
tests/test_garmin.py Normal file
View File

@@ -0,0 +1,59 @@
from dataclasses import replace
import pytest
from mywhoosh_garmin_sync.garmin import GarminUploadBlocked, GarminUploader
class FakeGarminClient:
def __init__(self, *args, **kwargs):
self.prompt_mfa = kwargs.get("prompt_mfa")
self.logged_in = False
def login(self, tokenstore=None):
self.logged_in = True
def upload_activity(self, activity_path):
return {"activityId": 12345, "path": activity_path}
class DuplicateGarminClient(FakeGarminClient):
def upload_activity(self, activity_path):
raise RuntimeError("409 duplicate activity already exists")
def test_garmin_uploader_success(settings, tmp_path):
fit = tmp_path / "activity.fit"
fit.write_bytes(b"fit")
uploader = GarminUploader(settings, client_factory=FakeGarminClient)
result = uploader.upload(fit)
assert result.status == "uploaded"
assert result.garmin_activity_id == "12345"
def test_garmin_uploader_duplicate(settings, tmp_path):
fit = tmp_path / "activity.fit"
fit.write_bytes(b"fit")
uploader = GarminUploader(settings, client_factory=DuplicateGarminClient)
result = uploader.upload(fit)
assert result.duplicate is True
assert result.status == "duplicate"
def test_mfa_prompt_blocks_without_code(settings):
uploader = GarminUploader(settings, client_factory=FakeGarminClient)
with pytest.raises(GarminUploadBlocked):
uploader._prompt_mfa()
def test_mfa_prompt_returns_env_code(settings):
settings = replace(settings, garmin_mfa_code="123456")
uploader = GarminUploader(settings, client_factory=FakeGarminClient)
assert uploader._prompt_mfa() == "123456"

58
tests/test_mywhoosh.py Normal file
View File

@@ -0,0 +1,58 @@
from mywhoosh_garmin_sync.mywhoosh import (
_looks_like_challenge_text,
_looks_like_challenge_url,
_looks_like_cookie_accept_label,
_source_ref,
_storage_restore_script,
)
def test_cookie_accept_label_matching():
assert _looks_like_cookie_accept_label("Accept")
assert _looks_like_cookie_accept_label("Accept all cookies")
assert _looks_like_cookie_accept_label("Allow all")
assert _looks_like_cookie_accept_label("I agree")
assert _looks_like_cookie_accept_label("Got it")
assert not _looks_like_cookie_accept_label("Reject all")
assert not _looks_like_cookie_accept_label("Download app")
def test_challenge_text_matching():
assert _looks_like_challenge_text("I'm not a robot")
assert _looks_like_challenge_text("Verify you are human")
assert _looks_like_challenge_text("iframe title recaptcha")
assert _looks_like_challenge_text("Cloudflare security check")
assert not _looks_like_challenge_text("Welcome to your activities")
def test_challenge_url_matching_does_not_flag_plain_login():
assert _looks_like_challenge_url("https://example.test/cdn-cgi/challenge-platform")
assert _looks_like_challenge_url("https://example.test/verify")
assert not _looks_like_challenge_url("https://event.mywhoosh.com/login/")
assert not _looks_like_challenge_url("https://event.mywhoosh.com/user/activities")
def test_source_ref_prefers_href_then_row_text():
assert _source_ref("https://example.test/a.fit", "Download", 0, "Ride A") == _source_ref(
"https://example.test/a.fit", "Download", 99, "Ride B"
)
assert _source_ref(None, "", 0, "Ride A") != _source_ref(None, "", 1, "Ride B")
def test_storage_restore_script_contains_session_storage():
script = _storage_restore_script(
[
{
"origin": "https://event.mywhoosh.com",
"localStorage": [{"name": "token", "value": "local"}],
"sessionStorage": [{"name": "session-token", "value": "session"}],
}
]
)
assert "https://event.mywhoosh.com" in script
assert "localStorage" in script
assert "sessionStorage" in script

44
tests/test_state.py Normal file
View File

@@ -0,0 +1,44 @@
from pathlib import Path
from mywhoosh_garmin_sync.state import ActivityStore
def test_state_transitions_and_hash_lookup(tmp_path: Path):
store = ActivityStore(tmp_path / "state.sqlite3")
store.initialize()
store.record_downloaded(
source_ref="source-1",
title="Ride",
source_url="https://example.test/ride.fit",
raw_path=tmp_path / "raw.fit",
raw_sha256="raw-hash",
)
assert store.get_status("source-1") == "downloaded"
assert store.is_terminal_source("source-1") is False
store.mark_converted(
source_ref="source-1",
converted_path=tmp_path / "converted.fit",
converted_sha256="converted-hash",
patched_field_count=4,
)
assert store.get_status("source-1") == "converted"
assert store.is_uploaded_hash("converted-hash") is False
store.mark_uploaded("source-1", "123")
assert store.get_status("source-1") == "uploaded"
assert store.is_terminal_source("source-1") is True
assert store.is_uploaded_hash("raw-hash") is True
assert store.is_uploaded_hash("converted-hash") is True
def test_failed_activity_can_be_retried(tmp_path: Path):
store = ActivityStore(tmp_path / "state.sqlite3")
store.initialize()
store.record_downloaded("source-1", "Ride", None, tmp_path / "raw.fit", "hash")
store.mark_failed("source-1", "network failed")
assert store.get_status("source-1") == "failed"
assert store.is_terminal_source("source-1") is False