Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 77 additions & 2 deletions src/boost_weblate/endpoint/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
boost_validation_errors,
to_error_dict,
)
from boost_weblate.endpoint.validators import validate_repo_segment
from boost_weblate.endpoint.validators import (
MAX_ADD_OR_UPDATE_LANGS,
MAX_SUBMODULES_PER_LANG,
validate_language_code,
validate_repo_segment,
)


class DrfValidationCode(StrEnum):
Expand Down Expand Up @@ -190,15 +195,59 @@ def validate_organization(self, value: str) -> str:
)
raise serializers.ValidationError(str(exc)) from exc

def validate_version(self, value: str) -> str:
"""Reject version strings with unsafe characters or excessive length."""
try:
return validate_repo_segment(value, field="version")
except ValidationError as exc:
self._custom_error_fields.add(RequestField.VERSION)
self._custom_validation_errors.extend(
boost_validation_errors(
[
(
BoostEndpointErrorCode.INVALID_CLONE_URL,
str(exc),
{"field": RequestField.VERSION},
)
]
)
)
raise serializers.ValidationError(str(exc)) from exc

def validate_extensions(self, value: list[str] | None) -> list[str] | None:
"""Strip entries and remove blanks so all-empty input does not filter files."""
if value is None:
return None
return [v.strip() for v in value if v.strip()]
cleaned: list[str] = []
for entry in value:
if not isinstance(entry, str):
raise serializers.ValidationError(
"Each extension must be a string.",
code=DrfValidationCode.NOT_A_LIST,
)
stripped = entry.strip()
if stripped:
cleaned.append(stripped)
return cleaned or None

def validate_add_or_update(self, value: dict[str, Any]) -> dict[str, Any]:
"""Require non-empty string language keys and non-empty submodule lists."""
items: list[tuple[BoostEndpointErrorCode, str, dict[str, Any]]] = []
if len(value) > MAX_ADD_OR_UPDATE_LANGS:
items.append(
(
BoostEndpointErrorCode.INVALID_LANGUAGE_CODE,
(
f"add_or_update: exceeds maximum of "
f"{MAX_ADD_OR_UPDATE_LANGS} language keys "
f"(got {len(value)})."
),
{"field": RequestField.ADD_OR_UPDATE},
)
)
Comment thread
whisper67265 marked this conversation as resolved.
self._custom_error_fields.add(RequestField.ADD_OR_UPDATE)
self._custom_validation_errors.extend(boost_validation_errors(items))
raise serializers.ValidationError({RequestField.ADD_OR_UPDATE: "invalid"})
for lang_code, submodules in value.items():
if not isinstance(lang_code, str) or lang_code.strip() == "":
items.append(
Expand All @@ -215,6 +264,20 @@ def validate_add_or_update(self, value: dict[str, Any]) -> dict[str, Any]:
)
)
continue
try:
validate_language_code(lang_code)
except ValidationError as exc:
items.append(
(
BoostEndpointErrorCode.INVALID_LANGUAGE_CODE,
str(exc),
{
"field": RequestField.ADD_OR_UPDATE,
"language": lang_code,
},
)
)
continue
if not isinstance(submodules, list):
items.append(
(
Expand All @@ -238,6 +301,18 @@ def validate_add_or_update(self, value: dict[str, Any]) -> dict[str, Any]:
{"field": RequestField.ADD_OR_UPDATE, "language": lang_code},
)
)
elif len(submodules) > MAX_SUBMODULES_PER_LANG:
items.append(
(
BoostEndpointErrorCode.INVALID_SUBMODULE_LIST,
(
f"add_or_update: key {lang_code!r} exceeds maximum of "
f"{MAX_SUBMODULES_PER_LANG} submodules "
f"(got {len(submodules)})."
),
{"field": RequestField.ADD_OR_UPDATE, "language": lang_code},
)
)
else:
for submodule in submodules:
if not isinstance(submodule, str):
Expand Down
47 changes: 40 additions & 7 deletions src/boost_weblate/endpoint/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,55 @@
from django.conf import settings
from django.core.exceptions import ValidationError

MAX_SEGMENT_LEN = 256
MAX_ADD_OR_UPDATE_LANGS = 50
MAX_SUBMODULES_PER_LANG = 100

_REPO_SEGMENT_RE = re.compile(r"^[A-Za-z0-9._-]+$")
_LANGUAGE_CODE_RE = re.compile(r"^[a-zA-Z0-9_-]+$")

# SCP-style SSH: git@host:path/to/repo.git
_SCP_SSH_RE = re.compile(r"^git@([^:/]+):(.+)$")


def validate_repo_segment(name: str, *, field: str) -> str:
"""Restrict organization/submodule to safe GitHub path segments."""
if not name or not name.strip():
def _validate_segment(
value: str,
*,
field: str,
pattern: re.Pattern[str],
allowed_chars: str,
) -> str:
if not value or not value.strip():
raise ValidationError(f"{field}: must be a non-empty string")
if not _REPO_SEGMENT_RE.fullmatch(name):
if len(value) > MAX_SEGMENT_LEN:
raise ValidationError(
f"{field}: exceeds maximum length of {MAX_SEGMENT_LEN} characters"
)
if not pattern.fullmatch(value):
raise ValidationError(
f"{field}: invalid characters in {name!r}; "
"allowed: letters, digits, '.', '_', '-'"
f"{field}: invalid characters in {value!r}; allowed: {allowed_chars}"
)
return name
return value


def validate_repo_segment(name: str, *, field: str) -> str:
"""Restrict organization/submodule to safe GitHub path segments."""
return _validate_segment(
name,
field=field,
pattern=_REPO_SEGMENT_RE,
allowed_chars="letters, digits, '.', '_', '-'",
)


def validate_language_code(code: str) -> str:
"""Restrict language codes to safe Weblate-style identifiers."""
return _validate_segment(
code,
field="language",
pattern=_LANGUAGE_CODE_RE,
allowed_chars="letters, digits, '_', '-'",
)


def _normalize_clone_url(url: str) -> str:
Expand Down
119 changes: 119 additions & 0 deletions tests/endpoint/test_serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,122 @@ def test_invalid_organization_still_flattens_other_drf_errors() -> None:
assert org_errors[0]["code"] == BoostEndpointErrorCode.INVALID_CLONE_URL.value
assert len(version_errors) == 1
assert version_errors[0]["metadata"]["drf_code"] == "required"


def test_add_or_update_serializer_rejects_invalid_version() -> None:
ser = AddOrUpdateRequestSerializer(
data={
"organization": "o",
"version": "../evil",
"add_or_update": {"zh_Hans": ["json"]},
}
)
assert not ser.is_valid()
assert BoostEndpointErrorCode.INVALID_CLONE_URL.value in _error_codes(
ser.structured_errors
)
version_errors = [
e for e in ser.structured_errors if e["metadata"]["field"] == "version"
]
assert len(version_errors) == 1


def test_add_or_update_serializer_rejects_sql_injection_lang_code() -> None:
ser = AddOrUpdateRequestSerializer(
data={
"organization": "o",
"version": "v",
"add_or_update": {"'; DROP TABLE--": ["json"]},
}
)
assert not ser.is_valid()
assert BoostEndpointErrorCode.INVALID_LANGUAGE_CODE.value in _error_codes(
ser.structured_errors
)


def test_add_or_update_serializer_rejects_whitespace_lang_code() -> None:
ser = AddOrUpdateRequestSerializer(
data={
"organization": "o",
"version": "v",
"add_or_update": {" ": ["json"]},
}
)
assert not ser.is_valid()
assert BoostEndpointErrorCode.INVALID_LANGUAGE_CODE.value in _error_codes(
ser.structured_errors
)


def test_add_or_update_serializer_rejects_non_string_submodule() -> None:
ser = AddOrUpdateRequestSerializer(
data={
"organization": "o",
"version": "v",
"add_or_update": {"zh_Hans": [["json"]]},
}
)
assert not ser.is_valid()
assert any(e["metadata"]["field"] == "add_or_update" for e in ser.structured_errors)


def test_add_or_update_serializer_rejects_oversized_organization() -> None:
ser = AddOrUpdateRequestSerializer(
data={
"organization": "o" * 10_000,
"version": "v",
"add_or_update": {"zh_Hans": ["json"]},
}
)
assert not ser.is_valid()
assert BoostEndpointErrorCode.INVALID_CLONE_URL.value in _error_codes(
ser.structured_errors
)


def test_add_or_update_serializer_rejects_too_many_languages() -> None:
from boost_weblate.endpoint.validators import MAX_ADD_OR_UPDATE_LANGS

langs = {f"lang{i}": ["json"] for i in range(MAX_ADD_OR_UPDATE_LANGS + 1)}
ser = AddOrUpdateRequestSerializer(
data={
"organization": "o",
"version": "v",
"add_or_update": langs,
}
)
assert not ser.is_valid()
assert BoostEndpointErrorCode.INVALID_LANGUAGE_CODE.value in _error_codes(
ser.structured_errors
)


def test_add_or_update_serializer_rejects_too_many_submodules() -> None:
from boost_weblate.endpoint.validators import MAX_SUBMODULES_PER_LANG

submodules = [f"mod{i}" for i in range(MAX_SUBMODULES_PER_LANG + 1)]
ser = AddOrUpdateRequestSerializer(
data={
"organization": "o",
"version": "v",
"add_or_update": {"zh_Hans": submodules},
}
)
assert not ser.is_valid()
assert BoostEndpointErrorCode.INVALID_SUBMODULE_LIST.value in _error_codes(
ser.structured_errors
)


def test_add_or_update_serializer_rejects_extensions_dict() -> None:
ser = AddOrUpdateRequestSerializer(
data={
"organization": "o",
"version": "v",
"add_or_update": {"zh_Hans": ["json"]},
"extensions": {".md": True},
}
)
assert not ser.is_valid()
assert any(e["metadata"]["field"] == "extensions" for e in ser.structured_errors)
Loading