"""Files resource: ``/files`` endpoints (list/upload/download/move/delete)."""
from __future__ import annotations
import mimetypes
from collections.abc import Sequence
from pathlib import Path
from typing import IO, Any
from pydantic import BaseModel
from pypresscart.models._common import Paginated
from pypresscart.models.files import (
DeleteFileResponse,
File,
MoveFilesRequest,
MoveFilesResponse,
UploadFilesResponse,
)
from pypresscart.resources._base import ResourceBase
[docs]
class FilesResource(ResourceBase):
"""File upload and management."""
[docs]
def list(
self,
*,
limit: int = 25,
page: int = 1,
sort_by: str | None = None,
order_by: str | None = None,
q: str | None = None,
folder_id: str | None = None,
as_json: bool | None = None,
) -> Paginated[File] | dict[str, Any]:
"""List files. Required scope: ``files.lists``."""
params = {
"limit": limit,
"page": page,
"sort_by": sort_by,
"order_by": order_by,
"q": q,
"folder_id": folder_id,
}
payload = self._client._request("GET", "/files", params=params)
return self._parse_paginated(payload, File, as_json)
[docs]
def get(
self,
file_id: str,
*,
as_json: bool | None = None,
) -> File | dict[str, Any]:
"""Get a single file record. Required scope: ``files.read``."""
payload = self._client._request("GET", f"/files/{file_id}")
return self._parse(payload, File, as_json)
[docs]
def upload(
self,
files: str | Path | IO[bytes] | tuple[str, IO[bytes], str] | Sequence[Any],
*,
folder_id: str | None = None,
as_json: bool | None = None,
) -> UploadFilesResponse | dict[str, Any]:
"""Upload 1-5 files. Required scope: ``files.create``.
Accepts:
- a path (``str`` or :class:`pathlib.Path`)
- an open binary file handle
- a ``(filename, fileobj, content_type)`` tuple
- a list mixing any of the above
"""
items: Sequence[Any] = files if isinstance(files, list) else [files]
multipart: list[tuple[str, Any]] = []
opened: list[IO[bytes]] = []
try:
for item in items:
multipart.append(("files", _prepare_upload(item, opened)))
data: dict[str, Any] = {}
if folder_id is not None:
data["folder_id"] = folder_id
payload = self._client._request("POST", "/files/upload", data=data, files=multipart)
finally:
for fh in opened:
fh.close()
return self._parse(payload, UploadFilesResponse, as_json)
[docs]
def download(self, file_id: str) -> bytes:
"""Download the raw bytes of a file. Required scope: ``files.read``."""
return self._client._request_raw("GET", f"/files/{file_id}/download")
[docs]
def move(
self,
body: MoveFilesRequest | BaseModel | dict[str, Any],
*,
as_json: bool | None = None,
) -> MoveFilesResponse | dict[str, Any]:
"""Move files into a folder (or to root with ``folder_id=None``). Scope: ``files.update``."""
payload = self._client._request("POST", "/files/move", json=self._serialize(body))
return self._parse(payload, MoveFilesResponse, as_json)
[docs]
def delete(
self,
file_id: str,
*,
as_json: bool | None = None,
) -> DeleteFileResponse | dict[str, Any]:
"""Delete a file. Required scope: ``files.delete``."""
payload = self._client._request("DELETE", f"/files/{file_id}")
return self._parse(payload, DeleteFileResponse, as_json)
def _prepare_upload(
item: str | Path | IO[bytes] | tuple[str, IO[bytes], str],
opened: list[IO[bytes]],
) -> tuple[str, IO[bytes]] | tuple[str, IO[bytes], str]:
# Explicit tuple — caller provided the content type, trust it.
if isinstance(item, tuple):
return item
if isinstance(item, (str, Path)):
path = Path(item)
fh = path.open("rb")
opened.append(fh)
return (path.name, fh, _detect_mime(fh, path.name))
name = getattr(item, "name", "upload")
filename = Path(name).name
return (filename, item, _detect_mime(item, filename))
# ---- MIME detection --------------------------------------------------------
# Magic-byte signatures for the content types Presscart's upload endpoint
# accepts, plus a few adjacent ones (gif, bmp, tiff, zip) for good measure.
# Each entry is (offset, magic_bytes, mime_type). Checked in order; first
# match wins.
_MAGIC_SIGNATURES: tuple[tuple[int, bytes, str], ...] = (
(0, b"\xff\xd8\xff", "image/jpeg"),
(0, b"\x89PNG\r\n\x1a\n", "image/png"),
(0, b"GIF87a", "image/gif"),
(0, b"GIF89a", "image/gif"),
(0, b"BM", "image/bmp"),
(0, b"II*\x00", "image/tiff"),
(0, b"MM\x00*", "image/tiff"),
(0, b"%PDF-", "application/pdf"),
# DOC (old binary Office format): OLE compound file header.
(0, b"\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1", "application/msword"),
)
_DOCX_MIME = "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
def _is_riff_webp(head: bytes) -> bool:
return len(head) >= 12 and head[0:4] == b"RIFF" and head[8:12] == b"WEBP"
def _looks_like_text(head: bytes) -> bool:
"""Heuristic: all bytes in the sniff window are printable / common whitespace."""
if not head:
return False
allowed = {0x09, 0x0A, 0x0D} # tab, LF, CR
return all(b in allowed or 0x20 <= b < 0x7F for b in head)
def _sniff_mime(head: bytes, filename: str) -> str | None:
"""Identify a MIME type from magic bytes, with a few heuristic extras.
Returns ``None`` if no reliable sniff is possible — callers should then
fall back to extension-based guessing.
"""
for offset, magic, mime in _MAGIC_SIGNATURES:
if head[offset : offset + len(magic)] == magic:
return mime
if _is_riff_webp(head):
return "image/webp"
# ZIP container — could be docx/xlsx/pptx/etc. or a plain zip. Use the
# filename extension to narrow it; fall back to application/zip.
if head[:4] == b"PK\x03\x04":
ext = Path(filename).suffix.lower()
if ext == ".docx":
return _DOCX_MIME
if ext == ".xlsx":
return "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
if ext == ".pptx":
return "application/vnd.openxmlformats-officedocument.presentationml.presentation"
return "application/zip"
if _looks_like_text(head):
return "text/plain"
return None
def _detect_mime(source: IO[bytes], filename: str) -> str:
"""Detect the MIME type of an upload source.
Precedence:
1. Magic-byte sniff of the first 64 bytes (canonical).
2. :func:`mimetypes.guess_type` on the filename.
3. ``application/octet-stream`` as a final fallback.
After reading, the stream is rewound to its original position so the
subsequent multipart upload sees the full content. Non-seekable streams
fall through to extension-based guessing only.
"""
sniffed: str | None = None
if hasattr(source, "seek") and hasattr(source, "tell"):
try:
pos = source.tell()
head = source.read(64)
source.seek(pos)
sniffed = _sniff_mime(head, filename)
except (OSError, ValueError):
# Non-seekable stream; fall through to extension.
sniffed = None
if sniffed:
return sniffed
guessed, _ = mimetypes.guess_type(filename)
return guessed or "application/octet-stream"