🎉 Initialize module repository
This commit is contained in:
6
models/__init__.py
Normal file
6
models/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
from . import mvd_tcg_card
|
||||
from . import mvd_tcg_mtg_card_image
|
||||
from . import mvd_tcg_mtg_scryfall_import_run
|
||||
from . import mvd_tcg_mtg_scryfall_api
|
||||
from . import mvd_tcg_mtg_set
|
||||
from . import res_config_settings
|
||||
6
models/constants.py
Normal file
6
models/constants.py
Normal file
@@ -0,0 +1,6 @@
|
||||
"""Shared constants for the MTG Scryfall connector."""
|
||||
|
||||
DEFAULT_SCRYFALL_API_BASE_URL = "https://api.scryfall.com"
|
||||
DEFAULT_SCRYFALL_TIMEOUT_SECONDS = 30
|
||||
DEFAULT_SCRYFALL_USER_AGENT = "Mantjeverse Digital TCG/1.0"
|
||||
DEFAULT_SCRYFALL_IMPORT_LANGUAGES = ("en", "de")
|
||||
801
models/mvd_tcg_card.py
Normal file
801
models/mvd_tcg_card.py
Normal file
@@ -0,0 +1,801 @@
|
||||
"""Scryfall-specific MTG card import helpers for the MTG connector."""
|
||||
|
||||
import re
|
||||
|
||||
from odoo import _, Command, api, fields, models
|
||||
|
||||
KEYWORD_CODE_PATTERN = re.compile(r"[^a-z0-9]+")
|
||||
|
||||
|
||||
class MvdTcgCard(models.Model):
|
||||
"""Extend MTG card references with Scryfall connector metadata."""
|
||||
|
||||
_inherit = "mvd.tcg.card"
|
||||
|
||||
mtg_scryfall_id = fields.Char(string="Scryfall ID", index=True)
|
||||
mtg_scryfall_uri = fields.Char(string="Scryfall URL", copy=False, readonly=True)
|
||||
mtg_scryfall_last_import_run_id = fields.Many2one(
|
||||
"mvd.tcg.mtg.scryfall.import.run",
|
||||
string="Last Import Run",
|
||||
copy=False,
|
||||
index=True,
|
||||
ondelete="set null",
|
||||
readonly=True,
|
||||
)
|
||||
mtg_scryfall_last_synced_at = fields.Datetime(
|
||||
string="Last Synced At",
|
||||
copy=False,
|
||||
readonly=True,
|
||||
)
|
||||
mtg_scryfall_image_ids = fields.One2many(
|
||||
"mvd.tcg.mtg.card.image",
|
||||
"card_id",
|
||||
string="Localized Images",
|
||||
copy=False,
|
||||
readonly=True,
|
||||
)
|
||||
mtg_display_image_1920 = fields.Image(
|
||||
string="Localized Image",
|
||||
compute="_compute_mtg_display_images",
|
||||
max_width=1920,
|
||||
max_height=1920,
|
||||
)
|
||||
mtg_display_image_512 = fields.Image(
|
||||
string="Localized Preview Image",
|
||||
compute="_compute_mtg_display_images",
|
||||
max_width=512,
|
||||
max_height=512,
|
||||
)
|
||||
|
||||
def _mvd_tcg_get_deck_image_binary(self):
|
||||
"""Prefer localized display images for deck previews.
|
||||
|
||||
Returns:
|
||||
str | bool: Base64 image data for deck-related previews.
|
||||
"""
|
||||
self.ensure_one()
|
||||
return (
|
||||
self.mtg_display_image_512
|
||||
or self.mtg_display_image_1920
|
||||
or super()._mvd_tcg_get_deck_image_binary()
|
||||
)
|
||||
|
||||
def _mtg_scryfall_get_refresh_query(self):
|
||||
"""Return the preferred lookup query for a Scryfall refresh.
|
||||
|
||||
Returns:
|
||||
str | bool: URL or identifier reusable by the shared lookup parser.
|
||||
"""
|
||||
self.ensure_one()
|
||||
if self.mtg_scryfall_uri:
|
||||
return self.mtg_scryfall_uri
|
||||
if self.mtg_scryfall_id:
|
||||
return self.mtg_scryfall_id
|
||||
if self.mtg_set_code and self.mtg_collector_number:
|
||||
return (
|
||||
f"https://api.scryfall.com/cards/"
|
||||
f"{self.mtg_set_code}/{self.mtg_collector_number}"
|
||||
)
|
||||
return False
|
||||
|
||||
@api.model
|
||||
def _mtg_scryfall_build_external_ref(self, payload):
|
||||
"""Build the stable MTG print reference used by the connector.
|
||||
|
||||
Args:
|
||||
payload: Raw Scryfall card payload.
|
||||
|
||||
Returns:
|
||||
str: Stable reference in ``set_code:collector_number`` form.
|
||||
"""
|
||||
set_code = (payload.get("set") or "").strip().lower()
|
||||
collector_number = (payload.get("collector_number") or "").strip().lower()
|
||||
return f"{set_code}:{collector_number}"
|
||||
|
||||
@api.model
|
||||
def _mtg_scryfall_select_primary_payload(self, payloads):
|
||||
"""Return the payload that should drive primary MTG field values.
|
||||
|
||||
Args:
|
||||
payloads: Raw localized Scryfall payloads for one print group.
|
||||
|
||||
Returns:
|
||||
dict: Primary payload, preferring English when available.
|
||||
"""
|
||||
valid_payloads = [
|
||||
payload
|
||||
for payload in (payloads or [])
|
||||
if payload.get("object") == "card" and payload.get("id")
|
||||
]
|
||||
if not valid_payloads:
|
||||
return {}
|
||||
|
||||
english_payload = next(
|
||||
(
|
||||
payload
|
||||
for payload in valid_payloads
|
||||
if (payload.get("lang") or "").strip().lower() == "en"
|
||||
),
|
||||
False,
|
||||
)
|
||||
return english_payload or valid_payloads[0]
|
||||
|
||||
@api.model
|
||||
def _mtg_scryfall_extract_image_url(self, payload):
|
||||
"""Return the best image URL available for a Scryfall payload.
|
||||
|
||||
Args:
|
||||
payload: Raw Scryfall card payload.
|
||||
|
||||
Returns:
|
||||
str | bool: Image URL or ``False`` when none is available.
|
||||
"""
|
||||
image_uris = payload.get("image_uris") or {}
|
||||
if image_uris.get("normal"):
|
||||
return image_uris["normal"]
|
||||
|
||||
for face_payload in payload.get("card_faces") or []:
|
||||
face_image_uris = face_payload.get("image_uris") or {}
|
||||
if face_image_uris.get("normal"):
|
||||
return face_image_uris["normal"]
|
||||
return False
|
||||
|
||||
@api.model
|
||||
def _mtg_scryfall_load_group_image_base64(self, payloads):
|
||||
"""Load the best available card image for one localized print group.
|
||||
|
||||
Args:
|
||||
payloads: Raw localized Scryfall payloads for one print group.
|
||||
|
||||
Returns:
|
||||
str | bool: Base64-encoded image data or ``False`` when unavailable.
|
||||
"""
|
||||
scryfall_api = self.env["mvd.tcg.mtg.scryfall.api"]
|
||||
for payload in payloads or ():
|
||||
image_base64 = scryfall_api.load_image_base64(
|
||||
self._mtg_scryfall_extract_image_url(payload)
|
||||
)
|
||||
if image_base64:
|
||||
return image_base64
|
||||
return False
|
||||
|
||||
@api.model
|
||||
def _mtg_scryfall_collect_group_images(self, payloads):
|
||||
"""Collect localized imported images for one Scryfall print group.
|
||||
|
||||
Args:
|
||||
payloads: Raw localized Scryfall payloads for one print group.
|
||||
|
||||
Returns:
|
||||
dict[str, str]: Base64 image data keyed by Scryfall language code.
|
||||
"""
|
||||
scryfall_api = self.env["mvd.tcg.mtg.scryfall.api"]
|
||||
collected_images = {}
|
||||
for payload in payloads or ():
|
||||
language_code = (payload.get("lang") or "").strip().lower() or "en"
|
||||
if language_code in collected_images:
|
||||
continue
|
||||
image_base64 = scryfall_api.load_image_base64(
|
||||
self._mtg_scryfall_extract_image_url(payload)
|
||||
)
|
||||
if image_base64:
|
||||
collected_images[language_code] = image_base64
|
||||
return collected_images
|
||||
|
||||
@api.model
|
||||
def _mtg_scryfall_extract_card_type_codes(self, payload):
|
||||
"""Extract normalized MTG card type codes from a Scryfall payload.
|
||||
|
||||
Args:
|
||||
payload: Raw Scryfall card payload.
|
||||
|
||||
Returns:
|
||||
list[str]: Known MTG card type codes present on the card.
|
||||
"""
|
||||
known_codes = {
|
||||
card_type.code: card_type.code
|
||||
for card_type in self.env["mvd.tcg.mtg.card.type"].search([])
|
||||
}
|
||||
raw_type_values = [payload.get("type_line")]
|
||||
raw_type_values.extend(
|
||||
face_payload.get("type_line")
|
||||
for face_payload in payload.get("card_faces") or []
|
||||
)
|
||||
extracted_codes = []
|
||||
for raw_type_value in raw_type_values:
|
||||
normalized_value = (raw_type_value or "").replace("—", " ")
|
||||
for token in normalized_value.split():
|
||||
type_code = token.strip().lower()
|
||||
if type_code in known_codes and type_code not in extracted_codes:
|
||||
extracted_codes.append(type_code)
|
||||
return extracted_codes
|
||||
|
||||
@api.model
|
||||
def _mtg_scryfall_extract_face_payloads(self, payload):
|
||||
"""Return ordered card-face payloads from one Scryfall payload.
|
||||
|
||||
Args:
|
||||
payload: Raw Scryfall card payload.
|
||||
|
||||
Returns:
|
||||
list[dict]: Ordered face payloads, or an empty list for single-face cards.
|
||||
"""
|
||||
return [
|
||||
face_payload
|
||||
for face_payload in (payload.get("card_faces") or [])
|
||||
if isinstance(face_payload, dict)
|
||||
]
|
||||
|
||||
@api.model
|
||||
def _mtg_scryfall_normalize_keyword_code(self, keyword_name):
|
||||
"""Build a stable code for one MTG keyword.
|
||||
|
||||
Args:
|
||||
keyword_name: Human keyword such as ``Double strike``.
|
||||
|
||||
Returns:
|
||||
str: Normalized keyword code.
|
||||
"""
|
||||
normalized_keyword = KEYWORD_CODE_PATTERN.sub(
|
||||
"-",
|
||||
(keyword_name or "").strip().lower(),
|
||||
).strip("-")
|
||||
return normalized_keyword or False
|
||||
|
||||
@api.model
|
||||
def _mtg_scryfall_resolve_taxonomy_records(
|
||||
self,
|
||||
model_name,
|
||||
codes,
|
||||
*,
|
||||
display_names=None,
|
||||
create_missing=False,
|
||||
):
|
||||
"""Resolve normalized taxonomy records from codes.
|
||||
|
||||
Args:
|
||||
model_name: Technical taxonomy model name.
|
||||
codes: Raw taxonomy codes.
|
||||
display_names: Optional display names keyed by normalized code.
|
||||
create_missing: Whether unknown codes should be created.
|
||||
|
||||
Returns:
|
||||
Model: Matching taxonomy records.
|
||||
"""
|
||||
normalized_codes = []
|
||||
for code in codes or []:
|
||||
normalized_code = (code or "").strip().lower()
|
||||
if normalized_code and normalized_code not in normalized_codes:
|
||||
normalized_codes.append(normalized_code)
|
||||
if not normalized_codes:
|
||||
return self.env[model_name]
|
||||
|
||||
taxonomy_model = self.env[model_name]
|
||||
taxonomy_records = taxonomy_model.search([("code", "in", normalized_codes)])
|
||||
records_by_code = {record.code: record for record in taxonomy_records}
|
||||
if create_missing:
|
||||
for normalized_code in normalized_codes:
|
||||
if normalized_code in records_by_code:
|
||||
continue
|
||||
display_name = (display_names or {}).get(normalized_code) or normalized_code
|
||||
taxonomy_record = taxonomy_model.create(
|
||||
{
|
||||
"name": display_name,
|
||||
"code": normalized_code,
|
||||
}
|
||||
)
|
||||
records_by_code[normalized_code] = taxonomy_record
|
||||
return taxonomy_model.browse(
|
||||
[records_by_code[code].id for code in normalized_codes if code in records_by_code]
|
||||
)
|
||||
|
||||
@api.model
|
||||
def _mtg_scryfall_map_translation_language_codes(self, scryfall_language_code):
|
||||
"""Map one Scryfall language code to active Odoo language codes.
|
||||
|
||||
Args:
|
||||
scryfall_language_code: Raw Scryfall language code such as ``de``.
|
||||
|
||||
Returns:
|
||||
list[str]: Active Odoo language codes for translation updates.
|
||||
"""
|
||||
return self.env["mvd.tcg.mtg.scryfall.api"].map_translation_language_codes(
|
||||
scryfall_language_code
|
||||
)
|
||||
|
||||
def _mtg_scryfall_update_translations_from_payload(self, payload):
|
||||
"""Update translated MTG card fields from one localized payload.
|
||||
|
||||
Args:
|
||||
payload: Raw localized Scryfall card payload.
|
||||
|
||||
Returns:
|
||||
None: The record is updated in place.
|
||||
"""
|
||||
self.ensure_one()
|
||||
card = self.with_context(
|
||||
mvd_tcg_bypass_validated_write=True,
|
||||
mvd_tcg_bypass_external_ref_write=True,
|
||||
)
|
||||
|
||||
language_codes = card._mtg_scryfall_map_translation_language_codes(
|
||||
payload.get("lang")
|
||||
)
|
||||
if not language_codes:
|
||||
return
|
||||
|
||||
translated_values = {
|
||||
"name": payload.get("printed_name") or payload.get("name") or False,
|
||||
"mtg_type_line": payload.get("printed_type_line") or payload.get("type_line") or False,
|
||||
"mtg_oracle_text": payload.get("printed_text") or payload.get("oracle_text") or False,
|
||||
"mtg_flavor_text": payload.get("flavor_text") or False,
|
||||
}
|
||||
for field_name, field_value in translated_values.items():
|
||||
if field_value:
|
||||
card.update_field_translations(
|
||||
field_name,
|
||||
{
|
||||
language_code: field_value
|
||||
for language_code in language_codes
|
||||
},
|
||||
)
|
||||
|
||||
def _mtg_scryfall_update_face_translations_from_payload(self, payload):
|
||||
"""Update translated face fields from one localized Scryfall payload.
|
||||
|
||||
Args:
|
||||
payload: Raw localized Scryfall card payload.
|
||||
|
||||
Returns:
|
||||
None: The record is updated in place.
|
||||
"""
|
||||
self.ensure_one()
|
||||
|
||||
language_codes = self._mtg_scryfall_map_translation_language_codes(
|
||||
payload.get("lang")
|
||||
)
|
||||
if not language_codes:
|
||||
return
|
||||
|
||||
localized_faces = self._mtg_scryfall_extract_face_payloads(payload)
|
||||
if not localized_faces:
|
||||
return
|
||||
|
||||
faces_by_sequence = {face.sequence: face for face in self.mtg_face_ids}
|
||||
for index, face_payload in enumerate(localized_faces, start=1):
|
||||
face = faces_by_sequence.get(index)
|
||||
if not face:
|
||||
continue
|
||||
translated_values = {
|
||||
"name": face_payload.get("printed_name") or face_payload.get("name") or False,
|
||||
"type_line": face_payload.get("printed_type_line") or face_payload.get("type_line") or False,
|
||||
"oracle_text": face_payload.get("printed_text") or face_payload.get("oracle_text") or False,
|
||||
"flavor_text": face_payload.get("flavor_text") or False,
|
||||
}
|
||||
for field_name, field_value in translated_values.items():
|
||||
if field_value:
|
||||
face.with_context(
|
||||
mvd_tcg_bypass_validated_write=True,
|
||||
mvd_tcg_bypass_external_ref_write=True,
|
||||
).update_field_translations(
|
||||
field_name,
|
||||
{
|
||||
language_code: field_value
|
||||
for language_code in language_codes
|
||||
},
|
||||
)
|
||||
|
||||
def _mtg_scryfall_get_preferred_image_language_codes(self):
|
||||
"""Return preferred Scryfall image languages for the current UI context.
|
||||
|
||||
Returns:
|
||||
list[str]: Ordered Scryfall language codes with fallbacks.
|
||||
"""
|
||||
self.ensure_one()
|
||||
|
||||
scryfall_api = self.env["mvd.tcg.mtg.scryfall.api"]
|
||||
current_odoo_language_code = self.env.context.get("lang") or self.env.user.lang
|
||||
language_codes = []
|
||||
|
||||
preferred_scryfall_code = scryfall_api.map_scryfall_language_code(
|
||||
current_odoo_language_code
|
||||
)
|
||||
if preferred_scryfall_code:
|
||||
language_codes.append(preferred_scryfall_code)
|
||||
if "en" not in language_codes:
|
||||
language_codes.append("en")
|
||||
return language_codes
|
||||
|
||||
def _mtg_scryfall_sync_localized_images(self, localized_images):
|
||||
"""Create or update localized imported images for the current card.
|
||||
|
||||
Args:
|
||||
localized_images: Base64 image data keyed by Scryfall language code.
|
||||
|
||||
Returns:
|
||||
None: The record is updated in place.
|
||||
"""
|
||||
self.ensure_one()
|
||||
card = self.with_context(
|
||||
mvd_tcg_bypass_validated_write=True,
|
||||
mvd_tcg_bypass_external_ref_write=True,
|
||||
)
|
||||
|
||||
image_model = self.env["mvd.tcg.mtg.card.image"]
|
||||
existing_images = {
|
||||
(image.language_code or "").strip().lower(): image
|
||||
for image in card.mtg_scryfall_image_ids
|
||||
}
|
||||
for language_code, image_base64 in (localized_images or {}).items():
|
||||
normalized_language_code = (language_code or "").strip().lower()
|
||||
if not normalized_language_code or not image_base64:
|
||||
continue
|
||||
|
||||
existing_image = existing_images.get(normalized_language_code)
|
||||
values = {
|
||||
"card_id": self.id,
|
||||
"language_code": normalized_language_code,
|
||||
"image_1920": image_base64,
|
||||
}
|
||||
if existing_image:
|
||||
existing_image.write({"image_1920": image_base64})
|
||||
else:
|
||||
image_model.create(values)
|
||||
|
||||
def _mtg_scryfall_sync_faces_from_payloads(self, payloads):
|
||||
"""Create or update ordered MTG card faces from localized payloads.
|
||||
|
||||
Args:
|
||||
payloads: Raw localized Scryfall payloads for one print group.
|
||||
|
||||
Returns:
|
||||
None: The card faces are synchronized in place.
|
||||
"""
|
||||
self.ensure_one()
|
||||
card = self.with_context(
|
||||
mvd_tcg_bypass_validated_write=True,
|
||||
mvd_tcg_bypass_external_ref_write=True,
|
||||
)
|
||||
|
||||
primary_payload = self._mtg_scryfall_select_primary_payload(payloads)
|
||||
primary_face_payloads = self._mtg_scryfall_extract_face_payloads(primary_payload)
|
||||
existing_faces = {face.sequence: face for face in card.mtg_face_ids}
|
||||
kept_face_ids = []
|
||||
|
||||
for index, face_payload in enumerate(primary_face_payloads, start=1):
|
||||
values = {
|
||||
"card_id": self.id,
|
||||
"sequence": index,
|
||||
"name": face_payload.get("name") or _("Face %s") % index,
|
||||
"mana_cost": face_payload.get("mana_cost") or False,
|
||||
"type_line": face_payload.get("type_line") or False,
|
||||
"oracle_text": face_payload.get("oracle_text") or False,
|
||||
"flavor_text": face_payload.get("flavor_text") or False,
|
||||
"power": face_payload.get("power") or False,
|
||||
"toughness": face_payload.get("toughness") or False,
|
||||
"loyalty": face_payload.get("loyalty") or False,
|
||||
"artist": face_payload.get("artist") or False,
|
||||
}
|
||||
face = existing_faces.get(index)
|
||||
if face:
|
||||
face.with_context(
|
||||
mvd_tcg_bypass_validated_write=True,
|
||||
mvd_tcg_bypass_external_ref_write=True,
|
||||
).write(values)
|
||||
else:
|
||||
face = self.env["mvd.tcg.mtg.card.face"].create(values)
|
||||
kept_face_ids.append(face.id)
|
||||
|
||||
(
|
||||
card.mtg_face_ids
|
||||
- self.env["mvd.tcg.mtg.card.face"].browse(kept_face_ids)
|
||||
).with_context(mvd_tcg_bypass_validated_write=True).unlink()
|
||||
|
||||
for payload in payloads or ():
|
||||
card._mtg_scryfall_update_face_translations_from_payload(payload)
|
||||
|
||||
def _mtg_scryfall_sync_legalities(self, primary_payload):
|
||||
"""Create or update legality rows from one primary Scryfall payload.
|
||||
|
||||
Args:
|
||||
primary_payload: Canonical payload for the current print group.
|
||||
|
||||
Returns:
|
||||
None: The legality rows are synchronized in place.
|
||||
"""
|
||||
self.ensure_one()
|
||||
card = self.with_context(
|
||||
mvd_tcg_bypass_validated_write=True,
|
||||
mvd_tcg_bypass_external_ref_write=True,
|
||||
)
|
||||
|
||||
legality_payload = primary_payload.get("legalities") or {}
|
||||
format_records = self._mtg_scryfall_resolve_taxonomy_records(
|
||||
"mvd.tcg.mtg.format",
|
||||
legality_payload.keys(),
|
||||
create_missing=False,
|
||||
)
|
||||
existing_legalities = {
|
||||
legality.format_id.id: legality for legality in card.mtg_legality_ids
|
||||
}
|
||||
kept_legality_ids = []
|
||||
|
||||
for format_record in format_records:
|
||||
status = (legality_payload.get(format_record.code) or "").strip().lower() or "not_legal"
|
||||
values = {
|
||||
"card_id": self.id,
|
||||
"format_id": format_record.id,
|
||||
"status": status,
|
||||
}
|
||||
legality = existing_legalities.get(format_record.id)
|
||||
if legality:
|
||||
legality.with_context(mvd_tcg_bypass_validated_write=True).write(
|
||||
{"status": status}
|
||||
)
|
||||
else:
|
||||
legality = self.env["mvd.tcg.mtg.card.legality"].create(values)
|
||||
kept_legality_ids.append(legality.id)
|
||||
|
||||
(
|
||||
card.mtg_legality_ids
|
||||
- self.env["mvd.tcg.mtg.card.legality"].browse(kept_legality_ids)
|
||||
).with_context(mvd_tcg_bypass_validated_write=True).unlink()
|
||||
|
||||
@api.depends(
|
||||
"image_1920",
|
||||
"mtg_scryfall_image_ids",
|
||||
"mtg_scryfall_image_ids.language_code",
|
||||
"mtg_scryfall_image_ids.image_1920",
|
||||
)
|
||||
@api.depends_context("lang")
|
||||
def _compute_mtg_display_images(self):
|
||||
"""Render the best localized card image for the current UI language.
|
||||
|
||||
Returns:
|
||||
None: The compute updates records in place.
|
||||
"""
|
||||
for card in self:
|
||||
localized_images = {
|
||||
(image.language_code or "").strip().lower(): image.image_1920
|
||||
for image in card.mtg_scryfall_image_ids
|
||||
if image.image_1920
|
||||
}
|
||||
display_image = False
|
||||
for language_code in card._mtg_scryfall_get_preferred_image_language_codes():
|
||||
display_image = localized_images.get(language_code)
|
||||
if display_image:
|
||||
break
|
||||
display_image = display_image or card.image_1920
|
||||
card.mtg_display_image_1920 = display_image
|
||||
card.mtg_display_image_512 = display_image
|
||||
|
||||
@api.model
|
||||
def _mtg_scryfall_prepare_group_values(self, primary_payload, *, import_run=None):
|
||||
"""Prepare ORM values for one localized Scryfall print group.
|
||||
|
||||
Args:
|
||||
primary_payload: Canonical payload that drives the shared fields.
|
||||
import_run: Optional import run record for traceability.
|
||||
|
||||
Returns:
|
||||
dict: ORM values for create or write operations.
|
||||
"""
|
||||
mtg_game = self.env["mvd.tcg.game"]._mvd_tcg_get_mtg_game()
|
||||
mtg_set = self.env["mvd.tcg.mtg.set"].mtg_scryfall_upsert_from_payload(
|
||||
primary_payload,
|
||||
import_run=import_run,
|
||||
)
|
||||
rarity = self.env["mvd.tcg.mtg.rarity"].search(
|
||||
[("code", "=", (primary_payload.get("rarity") or "").strip().lower())],
|
||||
limit=1,
|
||||
)
|
||||
color_codes = [
|
||||
(color_code or "").strip().upper()
|
||||
for color_code in (primary_payload.get("colors") or [])
|
||||
if color_code
|
||||
]
|
||||
color_records = self.env["mvd.tcg.mtg.color"].search(
|
||||
[("code", "in", color_codes)]
|
||||
)
|
||||
color_identity_codes = [
|
||||
(color_code or "").strip().upper()
|
||||
for color_code in (primary_payload.get("color_identity") or [])
|
||||
if color_code
|
||||
]
|
||||
color_identity_records = self.env["mvd.tcg.mtg.color"].search(
|
||||
[("code", "in", color_identity_codes)]
|
||||
)
|
||||
card_type_codes = self._mtg_scryfall_extract_card_type_codes(primary_payload)
|
||||
card_type_records = self.env["mvd.tcg.mtg.card.type"].search(
|
||||
[("code", "in", card_type_codes)]
|
||||
)
|
||||
keyword_display_names = {
|
||||
self._mtg_scryfall_normalize_keyword_code(keyword_name): keyword_name
|
||||
for keyword_name in (primary_payload.get("keywords") or [])
|
||||
if self._mtg_scryfall_normalize_keyword_code(keyword_name)
|
||||
}
|
||||
keyword_records = self._mtg_scryfall_resolve_taxonomy_records(
|
||||
"mvd.tcg.mtg.keyword",
|
||||
keyword_display_names.keys(),
|
||||
display_names=keyword_display_names,
|
||||
create_missing=True,
|
||||
)
|
||||
platform_records = self._mtg_scryfall_resolve_taxonomy_records(
|
||||
"mvd.tcg.mtg.platform",
|
||||
primary_payload.get("games") or [],
|
||||
)
|
||||
finish_records = self._mtg_scryfall_resolve_taxonomy_records(
|
||||
"mvd.tcg.mtg.finish",
|
||||
primary_payload.get("finishes") or [],
|
||||
)
|
||||
|
||||
values = {
|
||||
"active": True,
|
||||
"external_ref": self._mtg_scryfall_build_external_ref(primary_payload),
|
||||
"game_id": mtg_game.id,
|
||||
"state": "validated",
|
||||
"mtg_scryfall_id": primary_payload.get("id") or False,
|
||||
"mtg_scryfall_uri": primary_payload.get("scryfall_uri") or False,
|
||||
"mtg_set_id": mtg_set.id,
|
||||
"mtg_rarity_id": rarity.id or False,
|
||||
"mtg_collector_number": primary_payload.get("collector_number") or False,
|
||||
"mtg_oracle_id": primary_payload.get("oracle_id") or False,
|
||||
"mtg_layout": primary_payload.get("layout") or False,
|
||||
"mtg_mana_cost": primary_payload.get("mana_cost") or False,
|
||||
"mtg_mana_value": primary_payload.get("cmc")
|
||||
if primary_payload.get("cmc") is not None
|
||||
else False,
|
||||
"mtg_color_ids": [Command.set(color_records.ids)],
|
||||
"mtg_color_identity_ids": [Command.set(color_identity_records.ids)],
|
||||
"mtg_card_type_ids": [Command.set(card_type_records.ids)],
|
||||
"mtg_keyword_ids": [Command.set(keyword_records.ids)],
|
||||
"mtg_game_platform_ids": [Command.set(platform_records.ids)],
|
||||
"mtg_finish_ids": [Command.set(finish_records.ids)],
|
||||
"mtg_power": primary_payload.get("power") or False,
|
||||
"mtg_toughness": primary_payload.get("toughness") or False,
|
||||
"mtg_loyalty": primary_payload.get("loyalty") or False,
|
||||
"mtg_artist": primary_payload.get("artist") or False,
|
||||
"mtg_is_token": bool(
|
||||
primary_payload.get("layout") == "token"
|
||||
or primary_payload.get("set_type") == "token"
|
||||
),
|
||||
"mtg_is_reprint": bool(primary_payload.get("reprint")),
|
||||
"mtg_is_promo": bool(primary_payload.get("promo")),
|
||||
"mtg_is_digital": bool(primary_payload.get("digital")),
|
||||
"mtg_is_full_art": bool(primary_payload.get("full_art")),
|
||||
"mtg_is_textless": bool(primary_payload.get("textless")),
|
||||
}
|
||||
if import_run:
|
||||
values.update(
|
||||
{
|
||||
"mtg_scryfall_last_import_run_id": import_run.id,
|
||||
"mtg_scryfall_last_synced_at": fields.Datetime.now(),
|
||||
}
|
||||
)
|
||||
return values
|
||||
|
||||
@api.model
|
||||
def mtg_scryfall_upsert_group_from_payloads(self, payloads, import_run=None):
|
||||
"""Create or update one MTG print group from localized payloads.
|
||||
|
||||
Args:
|
||||
payloads: Raw localized Scryfall payloads for one print group.
|
||||
import_run: Optional import run record for traceability.
|
||||
|
||||
Returns:
|
||||
mvd.tcg.card: Upserted MTG card reference record.
|
||||
"""
|
||||
grouped_payloads = [
|
||||
payload
|
||||
for payload in (payloads or [])
|
||||
if payload.get("object") == "card" and payload.get("id")
|
||||
]
|
||||
if not grouped_payloads:
|
||||
return self.browse()
|
||||
|
||||
primary_payload = self._mtg_scryfall_select_primary_payload(grouped_payloads)
|
||||
external_ref = self._mtg_scryfall_build_external_ref(primary_payload)
|
||||
if not external_ref or external_ref == ":":
|
||||
return self.browse()
|
||||
|
||||
mtg_game = self.env["mvd.tcg.game"]._mvd_tcg_get_mtg_game()
|
||||
card = self.search(
|
||||
[("game_id", "=", mtg_game.id), ("external_ref", "=", external_ref)],
|
||||
limit=1,
|
||||
)
|
||||
values = self._mtg_scryfall_prepare_group_values(
|
||||
primary_payload,
|
||||
import_run=import_run,
|
||||
)
|
||||
should_write_primary_values = bool(
|
||||
not card
|
||||
or (primary_payload.get("lang") or "").strip().lower() in {"", "en"}
|
||||
)
|
||||
if should_write_primary_values:
|
||||
values.update(
|
||||
{
|
||||
"name": primary_payload.get("name")
|
||||
or primary_payload.get("printed_name")
|
||||
or external_ref.upper(),
|
||||
"mtg_type_line": primary_payload.get("type_line") or False,
|
||||
"mtg_oracle_text": primary_payload.get("oracle_text") or False,
|
||||
"mtg_flavor_text": primary_payload.get("flavor_text") or False,
|
||||
}
|
||||
)
|
||||
|
||||
localized_images = self._mtg_scryfall_collect_group_images(grouped_payloads)
|
||||
image_base64 = (
|
||||
localized_images.get((primary_payload.get("lang") or "").strip().lower())
|
||||
or localized_images.get("en")
|
||||
or self._mtg_scryfall_load_group_image_base64(grouped_payloads)
|
||||
)
|
||||
if image_base64 and (not card or not card.image_1920):
|
||||
values["image_1920"] = image_base64
|
||||
|
||||
if card:
|
||||
card = card.with_context(
|
||||
mvd_tcg_bypass_validated_write=True,
|
||||
mvd_tcg_bypass_external_ref_write=True,
|
||||
)
|
||||
card.write(values)
|
||||
else:
|
||||
if "name" not in values:
|
||||
values["name"] = (
|
||||
primary_payload.get("name")
|
||||
or primary_payload.get("printed_name")
|
||||
or external_ref.upper()
|
||||
)
|
||||
card = self.with_context(
|
||||
mvd_tcg_bypass_external_ref_write=True,
|
||||
).create(values)
|
||||
|
||||
for payload in grouped_payloads:
|
||||
card._mtg_scryfall_update_translations_from_payload(payload)
|
||||
card._mtg_scryfall_sync_faces_from_payloads(grouped_payloads)
|
||||
card._mtg_scryfall_sync_legalities(primary_payload)
|
||||
card._mtg_scryfall_sync_localized_images(localized_images)
|
||||
return card
|
||||
|
||||
@api.model
|
||||
def mtg_scryfall_upsert_from_payload(self, payload):
|
||||
"""Create or update one MTG card reference from one Scryfall payload.
|
||||
|
||||
Args:
|
||||
payload: Raw Scryfall card payload from the public API.
|
||||
|
||||
Returns:
|
||||
mvd.tcg.card: Upserted MTG card reference record.
|
||||
"""
|
||||
return self.mtg_scryfall_upsert_group_from_payloads([payload])
|
||||
|
||||
def action_open_mtg_scryfall_last_import_run(self):
|
||||
"""Open the most recent Scryfall import run linked to this card.
|
||||
|
||||
Returns:
|
||||
dict | bool: Window action or ``False`` when no run is linked.
|
||||
"""
|
||||
self.ensure_one()
|
||||
if not self.mtg_scryfall_last_import_run_id:
|
||||
return False
|
||||
|
||||
return {
|
||||
"type": "ir.actions.act_window",
|
||||
"name": self.mtg_scryfall_last_import_run_id.display_name,
|
||||
"res_model": "mvd.tcg.mtg.scryfall.import.run",
|
||||
"view_mode": "form",
|
||||
"res_id": self.mtg_scryfall_last_import_run_id.id,
|
||||
"target": "current",
|
||||
}
|
||||
|
||||
def action_mtg_scryfall_refresh_card(self):
|
||||
"""Refresh the current MTG card from the Scryfall connector.
|
||||
|
||||
Returns:
|
||||
dict: Window action for the created refresh run.
|
||||
"""
|
||||
self.ensure_one()
|
||||
self.env["mvd.tcg.mtg.scryfall.api"]._mtg_scryfall_check_manager_access()
|
||||
refresh_run = self.env["mvd.tcg.mtg.scryfall.import.run"].create_card_refresh_run(
|
||||
self
|
||||
)
|
||||
return refresh_run.action_execute_import()
|
||||
29
models/mvd_tcg_mtg_card_image.py
Normal file
29
models/mvd_tcg_mtg_card_image.py
Normal file
@@ -0,0 +1,29 @@
|
||||
"""Localized Scryfall image records for MTG cards."""
|
||||
|
||||
from odoo import fields, models
|
||||
|
||||
|
||||
class MvdTcgMtgCardImage(models.Model):
|
||||
"""Store one localized imported Scryfall image per card and language."""
|
||||
|
||||
_name = "mvd.tcg.mtg.card.image"
|
||||
_description = "MTG Card Image"
|
||||
_order = "card_id, language_code, id"
|
||||
|
||||
card_id = fields.Many2one(
|
||||
"mvd.tcg.card",
|
||||
required=True,
|
||||
index=True,
|
||||
ondelete="cascade",
|
||||
)
|
||||
language_code = fields.Char(required=True, index=True)
|
||||
image_1920 = fields.Image(
|
||||
required=True,
|
||||
max_width=1920,
|
||||
max_height=1920,
|
||||
)
|
||||
|
||||
_card_language_unique = models.Constraint(
|
||||
"UNIQUE (card_id, language_code)",
|
||||
"Only one localized MTG image is allowed per card and language.",
|
||||
)
|
||||
933
models/mvd_tcg_mtg_scryfall_api.py
Normal file
933
models/mvd_tcg_mtg_scryfall_api.py
Normal file
@@ -0,0 +1,933 @@
|
||||
"""Scryfall API helpers for the MTG reference connector."""
|
||||
|
||||
import base64
|
||||
import json
|
||||
import re
|
||||
from urllib import error, parse, request
|
||||
|
||||
from odoo import _, models
|
||||
from odoo.exceptions import UserError
|
||||
|
||||
from .constants import (
|
||||
DEFAULT_SCRYFALL_API_BASE_URL,
|
||||
DEFAULT_SCRYFALL_IMPORT_LANGUAGES,
|
||||
DEFAULT_SCRYFALL_TIMEOUT_SECONDS,
|
||||
DEFAULT_SCRYFALL_USER_AGENT,
|
||||
)
|
||||
SCRYFALL_ODOO_LANGUAGE_ALIASES = {
|
||||
"de": ("de_DE",),
|
||||
"en": ("en_US",),
|
||||
"es": ("es_ES", "es_419"),
|
||||
"fr": ("fr_FR",),
|
||||
"it": ("it_IT",),
|
||||
"ja": ("ja_JP",),
|
||||
"ko": ("ko_KR",),
|
||||
"pt": ("pt_PT", "pt_BR"),
|
||||
"ru": ("ru_RU",),
|
||||
"zhs": ("zh_CN",),
|
||||
"zht": ("zh_TW",),
|
||||
}
|
||||
SCRYFALL_LANGUAGE_CODES = {
|
||||
"de",
|
||||
"en",
|
||||
"es",
|
||||
"fr",
|
||||
"it",
|
||||
"ja",
|
||||
"ko",
|
||||
"ph",
|
||||
"pt",
|
||||
"ru",
|
||||
"zhs",
|
||||
"zht",
|
||||
}
|
||||
SCRYFALL_LOOKUP_MODES = {"exact", "fuzzy", "url"}
|
||||
_SCRYFALL_CARD_UUID_PATTERN = re.compile(
|
||||
r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
def normalize_language_codes(raw_value, default=None):
|
||||
"""Normalize Scryfall language codes into a stable tuple.
|
||||
|
||||
Args:
|
||||
raw_value: Iterable or comma-separated language codes.
|
||||
default: Fallback language codes when the input is empty.
|
||||
|
||||
Returns:
|
||||
tuple[str, ...]: Deduplicated lower-case Scryfall language codes.
|
||||
"""
|
||||
if isinstance(raw_value, str):
|
||||
chunks = re.split(r"[\s,;]+", raw_value)
|
||||
else:
|
||||
chunks = list(raw_value or ())
|
||||
|
||||
values = []
|
||||
for chunk in chunks:
|
||||
code = str(chunk or "").strip().lower()
|
||||
if code and code not in values:
|
||||
values.append(code)
|
||||
|
||||
fallback = list(default or DEFAULT_SCRYFALL_IMPORT_LANGUAGES)
|
||||
return tuple(values or fallback)
|
||||
|
||||
|
||||
class MvdTcgMtgScryfallApi(models.AbstractModel):
|
||||
"""Provide Scryfall lookups for the MTG connector."""
|
||||
|
||||
_name = "mvd.tcg.mtg.scryfall.api"
|
||||
_description = "MTG Scryfall API"
|
||||
_SCRYFALL_ALLOWED_API_HOSTS = frozenset({"api.scryfall.com"})
|
||||
_SCRYFALL_ALLOWED_IMAGE_HOSTS = frozenset(
|
||||
{"api.scryfall.com", "cards.scryfall.io"}
|
||||
)
|
||||
_SCRYFALL_ALLOWED_IMAGE_HOST_SUFFIXES = (".scryfall.io",)
|
||||
|
||||
def _mtg_scryfall_check_manager_access(self):
|
||||
"""Require manager-level access for Scryfall connector actions."""
|
||||
if self.env.is_superuser() or any(
|
||||
self.env.user.has_group(xmlid)
|
||||
for xmlid in (
|
||||
"mvd_tcg_base.mvd_tcg_base_group_manager",
|
||||
"base.group_system",
|
||||
)
|
||||
):
|
||||
return
|
||||
raise UserError(_("Only TCG managers can run Scryfall connector actions."))
|
||||
|
||||
@classmethod
|
||||
def _validate_allowed_url(
|
||||
cls,
|
||||
url,
|
||||
*,
|
||||
label,
|
||||
allowed_hosts,
|
||||
allowed_host_suffixes=(),
|
||||
):
|
||||
"""Validate one configured or upstream URL against a strict allowlist.
|
||||
|
||||
Args:
|
||||
url: Absolute URL that should be validated.
|
||||
label: Human label used in error messages.
|
||||
allowed_hosts: Exact hostnames allowed for this URL class.
|
||||
allowed_host_suffixes: Optional allowed hostname suffixes.
|
||||
|
||||
Returns:
|
||||
str: Normalized absolute URL.
|
||||
|
||||
Raises:
|
||||
UserError: If the URL uses a disallowed scheme or host.
|
||||
"""
|
||||
normalized_url = (url or "").strip()
|
||||
parsed_url = parse.urlparse(normalized_url)
|
||||
hostname = (parsed_url.hostname or "").lower()
|
||||
if parsed_url.scheme != "https" or not hostname:
|
||||
raise UserError(
|
||||
_("%(label)s must use HTTPS and a trusted host.")
|
||||
% {"label": label}
|
||||
)
|
||||
if hostname in allowed_hosts:
|
||||
return normalized_url
|
||||
if any(hostname.endswith(suffix) for suffix in allowed_host_suffixes):
|
||||
return normalized_url
|
||||
raise UserError(
|
||||
_("%(label)s must use a trusted Scryfall host.")
|
||||
% {"label": label}
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _validate_api_url(cls, url):
|
||||
"""Validate one Scryfall API URL.
|
||||
|
||||
Args:
|
||||
url: Absolute API URL.
|
||||
|
||||
Returns:
|
||||
str: Validated absolute API URL.
|
||||
"""
|
||||
return cls._validate_allowed_url(
|
||||
url,
|
||||
label=_("Scryfall API URL"),
|
||||
allowed_hosts=cls._SCRYFALL_ALLOWED_API_HOSTS,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _validate_image_url(cls, url):
|
||||
"""Validate one Scryfall-hosted image URL.
|
||||
|
||||
Args:
|
||||
url: Absolute image URL.
|
||||
|
||||
Returns:
|
||||
str: Validated absolute image URL.
|
||||
"""
|
||||
return cls._validate_allowed_url(
|
||||
url,
|
||||
label=_("Scryfall image URL"),
|
||||
allowed_hosts=cls._SCRYFALL_ALLOWED_IMAGE_HOSTS,
|
||||
allowed_host_suffixes=cls._SCRYFALL_ALLOWED_IMAGE_HOST_SUFFIXES,
|
||||
)
|
||||
|
||||
def _get_config_parameter(self, name, default=False):
|
||||
"""Return one connector configuration parameter.
|
||||
|
||||
Args:
|
||||
name: Technical config parameter key.
|
||||
default: Fallback value when the parameter is unset.
|
||||
|
||||
Returns:
|
||||
str | bool: Stored config value or the provided fallback.
|
||||
"""
|
||||
return self.env["ir.config_parameter"].sudo().get_param(name, default)
|
||||
|
||||
def get_api_base_url(self):
|
||||
"""Return the configured Scryfall API base URL.
|
||||
|
||||
Returns:
|
||||
str: Absolute base URL for API requests.
|
||||
"""
|
||||
return self._validate_api_url(
|
||||
(
|
||||
self._get_config_parameter(
|
||||
"mvd_tcg_mtg_scryfall.api_base_url",
|
||||
DEFAULT_SCRYFALL_API_BASE_URL,
|
||||
)
|
||||
or DEFAULT_SCRYFALL_API_BASE_URL
|
||||
).strip()
|
||||
)
|
||||
|
||||
def get_timeout_seconds(self):
|
||||
"""Return the configured Scryfall request timeout.
|
||||
|
||||
Returns:
|
||||
int: Timeout in seconds.
|
||||
"""
|
||||
raw_value = self._get_config_parameter(
|
||||
"mvd_tcg_mtg_scryfall.timeout_seconds",
|
||||
DEFAULT_SCRYFALL_TIMEOUT_SECONDS,
|
||||
)
|
||||
try:
|
||||
return max(1, int(raw_value))
|
||||
except (TypeError, ValueError):
|
||||
return DEFAULT_SCRYFALL_TIMEOUT_SECONDS
|
||||
|
||||
def get_user_agent(self):
|
||||
"""Return the configured Scryfall HTTP user agent.
|
||||
|
||||
Returns:
|
||||
str: User agent string for API and image requests.
|
||||
"""
|
||||
return (
|
||||
self._get_config_parameter(
|
||||
"mvd_tcg_mtg_scryfall.user_agent",
|
||||
DEFAULT_SCRYFALL_USER_AGENT,
|
||||
)
|
||||
or DEFAULT_SCRYFALL_USER_AGENT
|
||||
).strip()
|
||||
|
||||
def get_default_max_cards_per_set(self):
|
||||
"""Return the default per-set card limit for controlled imports.
|
||||
|
||||
Returns:
|
||||
int: Configured limit or ``0`` for no limit.
|
||||
"""
|
||||
raw_value = self._get_config_parameter(
|
||||
"mvd_tcg_mtg_scryfall.import_max_cards_per_set",
|
||||
0,
|
||||
)
|
||||
try:
|
||||
return max(0, int(raw_value))
|
||||
except (TypeError, ValueError):
|
||||
return 0
|
||||
|
||||
def get_default_include_tokens(self):
|
||||
"""Return whether token cards should be included by default.
|
||||
|
||||
Returns:
|
||||
bool: ``True`` when controlled set imports should include tokens.
|
||||
"""
|
||||
raw_value = self._get_config_parameter(
|
||||
"mvd_tcg_mtg_scryfall.import_include_tokens",
|
||||
False,
|
||||
)
|
||||
return str(raw_value).strip().lower() in {"1", "true", "yes", "on"}
|
||||
|
||||
def _build_api_url(self, path, params=None):
|
||||
"""Build one absolute Scryfall API URL.
|
||||
|
||||
Args:
|
||||
path: Relative Scryfall API path.
|
||||
params: Optional query parameters.
|
||||
|
||||
Returns:
|
||||
str: Absolute API URL.
|
||||
"""
|
||||
url = f"{self.get_api_base_url().rstrip('/')}/{path.lstrip('/')}"
|
||||
if params:
|
||||
url = f"{url}?{parse.urlencode(params, doseq=True)}"
|
||||
return url
|
||||
|
||||
def _request_json(self, url):
|
||||
"""Perform one JSON request against the Scryfall API.
|
||||
|
||||
Args:
|
||||
url: Absolute API URL.
|
||||
|
||||
Returns:
|
||||
dict: Parsed JSON payload.
|
||||
|
||||
Raises:
|
||||
UserError: If Scryfall rejects the request or returns invalid JSON.
|
||||
"""
|
||||
validated_url = self._validate_api_url(url)
|
||||
http_request = request.Request(
|
||||
validated_url,
|
||||
headers={
|
||||
"User-Agent": self.get_user_agent(),
|
||||
"Accept": "application/json;q=0.9,*/*;q=0.8",
|
||||
},
|
||||
)
|
||||
try:
|
||||
with request.urlopen(
|
||||
http_request,
|
||||
timeout=self.get_timeout_seconds(),
|
||||
) as response:
|
||||
return json.loads(response.read().decode("utf-8"))
|
||||
except error.HTTPError as exc:
|
||||
details = exc.read().decode("utf-8", errors="replace")
|
||||
try:
|
||||
payload = json.loads(details)
|
||||
except json.JSONDecodeError:
|
||||
payload = {}
|
||||
message = payload.get("details") or payload.get("error") or details or str(exc)
|
||||
raise UserError(_("Scryfall lookup failed: %s") % message) from exc
|
||||
except (error.URLError, OSError, json.JSONDecodeError) as exc:
|
||||
raise UserError(_("Scryfall lookup failed: %s") % exc) from exc
|
||||
|
||||
def _request_json_path(self, path, params=None):
|
||||
"""Perform one JSON request against a relative Scryfall API path.
|
||||
|
||||
Args:
|
||||
path: Relative Scryfall API path.
|
||||
params: Optional query parameters.
|
||||
|
||||
Returns:
|
||||
dict: Parsed JSON payload.
|
||||
"""
|
||||
return self._request_json(self._build_api_url(path, params=params))
|
||||
|
||||
def _normalize_lookup_mode(self, lookup_mode):
|
||||
"""Return a validated lookup mode for card imports.
|
||||
|
||||
Args:
|
||||
lookup_mode: Raw lookup mode from a wizard or batch entry.
|
||||
|
||||
Returns:
|
||||
str: One of ``exact``, ``fuzzy`` or ``url``.
|
||||
|
||||
Raises:
|
||||
UserError: If the lookup mode is unknown.
|
||||
"""
|
||||
normalized_mode = (lookup_mode or "").strip().lower()
|
||||
if normalized_mode not in SCRYFALL_LOOKUP_MODES:
|
||||
raise UserError(_("Unsupported Scryfall lookup mode: %s") % (lookup_mode or ""))
|
||||
return normalized_mode
|
||||
|
||||
def map_translation_language_codes(self, scryfall_language_code):
|
||||
"""Map one Scryfall language code to active Odoo languages.
|
||||
|
||||
Args:
|
||||
scryfall_language_code: Raw Scryfall language code such as ``de``.
|
||||
|
||||
Returns:
|
||||
list[str]: Active Odoo language codes for the given Scryfall code.
|
||||
"""
|
||||
normalized_scryfall_code = (scryfall_language_code or "").strip().lower()
|
||||
if not normalized_scryfall_code:
|
||||
return []
|
||||
|
||||
installed_codes = {
|
||||
code.lower().replace("-", "_"): code
|
||||
for code in self.env["res.lang"].search([("active", "=", True)]).mapped("code")
|
||||
}
|
||||
resolved_codes = []
|
||||
for alias in SCRYFALL_ODOO_LANGUAGE_ALIASES.get(normalized_scryfall_code, ()):
|
||||
normalized_alias = alias.lower().replace("-", "_")
|
||||
installed_code = installed_codes.get(normalized_alias)
|
||||
if installed_code and installed_code not in resolved_codes:
|
||||
resolved_codes.append(installed_code)
|
||||
for normalized_code, installed_code in installed_codes.items():
|
||||
if normalized_code.split("_", 1)[0] == normalized_scryfall_code:
|
||||
if installed_code not in resolved_codes:
|
||||
resolved_codes.append(installed_code)
|
||||
return resolved_codes
|
||||
|
||||
def map_scryfall_language_code(self, odoo_language_code):
|
||||
"""Map one Odoo language code to a Scryfall language code.
|
||||
|
||||
Args:
|
||||
odoo_language_code: Odoo language code such as ``de_DE``.
|
||||
|
||||
Returns:
|
||||
str | bool: Matching Scryfall language code or ``False``.
|
||||
"""
|
||||
normalized_odoo_code = (
|
||||
(odoo_language_code or "").strip().lower().replace("-", "_")
|
||||
)
|
||||
if not normalized_odoo_code:
|
||||
return False
|
||||
|
||||
for scryfall_code, aliases in SCRYFALL_ODOO_LANGUAGE_ALIASES.items():
|
||||
normalized_aliases = {
|
||||
alias.lower().replace("-", "_") for alias in aliases
|
||||
}
|
||||
if normalized_odoo_code in normalized_aliases:
|
||||
return scryfall_code
|
||||
|
||||
base_language = normalized_odoo_code.split("_", 1)[0]
|
||||
return base_language if base_language in SCRYFALL_LANGUAGE_CODES else False
|
||||
|
||||
def get_import_language_codes(self):
|
||||
"""Return active Scryfall language codes that should be imported.
|
||||
|
||||
Returns:
|
||||
tuple[str, ...]: Normalized Scryfall language codes.
|
||||
"""
|
||||
configured_languages = self._get_config_parameter(
|
||||
"mvd_tcg_mtg_scryfall.import_language_codes",
|
||||
False,
|
||||
)
|
||||
return normalize_language_codes(
|
||||
configured_languages or DEFAULT_SCRYFALL_IMPORT_LANGUAGES
|
||||
)
|
||||
|
||||
def _parse_card_url_or_id(self, identifier):
|
||||
"""Parse one Scryfall URL or UUID into a reusable lookup descriptor.
|
||||
|
||||
Args:
|
||||
identifier: Human card URL, API URL or raw Scryfall UUID.
|
||||
|
||||
Returns:
|
||||
dict[str, object]: Parsed descriptor containing request path.
|
||||
|
||||
Raises:
|
||||
UserError: If the identifier is not a supported Scryfall reference.
|
||||
"""
|
||||
normalized_identifier = (identifier or "").strip()
|
||||
if not normalized_identifier:
|
||||
raise UserError(_("Enter a Scryfall card URL or card id."))
|
||||
|
||||
if _SCRYFALL_CARD_UUID_PATTERN.match(normalized_identifier):
|
||||
return {
|
||||
"lookup_mode": "url",
|
||||
"query": normalized_identifier,
|
||||
"path": f"/cards/{normalized_identifier}",
|
||||
"params": None,
|
||||
}
|
||||
|
||||
parsed_url = parse.urlparse(normalized_identifier)
|
||||
hostname = (parsed_url.hostname or "").lower()
|
||||
path_segments = [segment for segment in parsed_url.path.split("/") if segment]
|
||||
if hostname not in {"scryfall.com", "www.scryfall.com", "api.scryfall.com"}:
|
||||
raise UserError(_("Enter a Scryfall card URL or card id."))
|
||||
if not path_segments:
|
||||
raise UserError(_("Enter a Scryfall card URL or card id."))
|
||||
|
||||
if path_segments[0] == "cards" and len(path_segments) >= 2:
|
||||
return {
|
||||
"lookup_mode": "url",
|
||||
"query": normalized_identifier,
|
||||
"path": f"/cards/{parse.quote(path_segments[1])}",
|
||||
"params": None,
|
||||
}
|
||||
|
||||
if path_segments[0] == "card" and len(path_segments) >= 3:
|
||||
set_code = parse.quote(path_segments[1])
|
||||
collector_number = parse.quote(path_segments[2], safe="")
|
||||
path = f"/cards/{set_code}/{collector_number}"
|
||||
language_code = (path_segments[3] or "").lower() if len(path_segments) >= 4 else ""
|
||||
if language_code in SCRYFALL_LANGUAGE_CODES:
|
||||
path = f"{path}/{parse.quote(path_segments[3])}"
|
||||
return {
|
||||
"lookup_mode": "url",
|
||||
"query": normalized_identifier,
|
||||
"path": path,
|
||||
"params": None,
|
||||
}
|
||||
|
||||
raise UserError(_("Enter a Scryfall card URL or card id."))
|
||||
|
||||
def parse_lookup_descriptor(self, query, lookup_mode):
|
||||
"""Parse one card lookup into a reusable request descriptor.
|
||||
|
||||
Args:
|
||||
query: Raw card lookup query from the user.
|
||||
lookup_mode: Lookup mode such as ``exact``, ``fuzzy`` or ``url``.
|
||||
|
||||
Returns:
|
||||
dict[str, object]: Parsed descriptor with request path and parameters.
|
||||
|
||||
Raises:
|
||||
UserError: If the query is empty or malformed for the chosen mode.
|
||||
"""
|
||||
normalized_query = (query or "").strip()
|
||||
if not normalized_query:
|
||||
raise UserError(_("Enter a card name or Scryfall card URL first."))
|
||||
|
||||
normalized_mode = self._normalize_lookup_mode(lookup_mode)
|
||||
if normalized_mode == "url":
|
||||
return self._parse_card_url_or_id(normalized_query)
|
||||
|
||||
return {
|
||||
"lookup_mode": normalized_mode,
|
||||
"query": normalized_query,
|
||||
"path": "/cards/named",
|
||||
"params": {normalized_mode: normalized_query},
|
||||
}
|
||||
|
||||
def load_image_base64(self, image_url):
|
||||
"""Download one card image and return it in Odoo's base64 format.
|
||||
|
||||
Args:
|
||||
image_url: Absolute image URL from Scryfall.
|
||||
|
||||
Returns:
|
||||
str | bool: Base64-encoded image data or ``False`` on download issues.
|
||||
"""
|
||||
if not image_url:
|
||||
return False
|
||||
validated_image_url = self._validate_image_url(image_url)
|
||||
|
||||
http_request = request.Request(
|
||||
validated_image_url,
|
||||
headers={
|
||||
"User-Agent": self.get_user_agent(),
|
||||
"Accept": "image/avif,image/webp,image/*,*/*;q=0.8",
|
||||
},
|
||||
)
|
||||
try:
|
||||
with request.urlopen(
|
||||
http_request,
|
||||
timeout=self.get_timeout_seconds(),
|
||||
) as response:
|
||||
return base64.b64encode(response.read()).decode("ascii")
|
||||
except (error.URLError, OSError):
|
||||
return False
|
||||
|
||||
def lookup_card_payload(self, query, lookup_mode):
|
||||
"""Resolve one MTG card payload from a parsed lookup descriptor.
|
||||
|
||||
Args:
|
||||
query: Raw lookup query.
|
||||
lookup_mode: Lookup mode such as ``exact``, ``fuzzy`` or ``url``.
|
||||
|
||||
Returns:
|
||||
dict: Raw Scryfall card payload.
|
||||
"""
|
||||
self._mtg_scryfall_check_manager_access()
|
||||
descriptor = self.parse_lookup_descriptor(query, lookup_mode)
|
||||
return self._request_json_path(
|
||||
descriptor["path"],
|
||||
params=descriptor.get("params"),
|
||||
)
|
||||
|
||||
def lookup_card_payloads(self, query, lookup_mode, languages=None):
|
||||
"""Resolve one card lookup and expand it to localized print payloads.
|
||||
|
||||
Args:
|
||||
query: Raw lookup query.
|
||||
lookup_mode: Lookup mode such as ``exact``, ``fuzzy`` or ``url``.
|
||||
languages: Optional import language scope.
|
||||
|
||||
Returns:
|
||||
list[dict]: Deduplicated localized payloads for one print group.
|
||||
"""
|
||||
payload = self.lookup_card_payload(query, lookup_mode)
|
||||
return self.get_localized_print_payloads(payload, languages=languages)
|
||||
|
||||
def lookup_exact(self, card_name):
|
||||
"""Look up one card by exact name on Scryfall."""
|
||||
return self.lookup_card_payload(card_name, "exact")
|
||||
|
||||
def lookup_fuzzy(self, card_name):
|
||||
"""Look up one card by fuzzy name on Scryfall."""
|
||||
return self.lookup_card_payload(card_name, "fuzzy")
|
||||
|
||||
def lookup_by_url_or_id(self, identifier):
|
||||
"""Look up one card from a Scryfall URL or card UUID."""
|
||||
return self.lookup_card_payload(identifier, "url")
|
||||
|
||||
def iter_search_card_payloads(self, query, *, unique="cards", limit=None, include_multilingual=False, order=None, direction=None):
|
||||
"""Yield card payloads from the paginated Scryfall search API.
|
||||
|
||||
Args:
|
||||
query: Raw Scryfall search query.
|
||||
unique: Requested Scryfall uniqueness mode.
|
||||
limit: Optional maximum number of returned payloads.
|
||||
include_multilingual: Whether Scryfall should include multilingual prints.
|
||||
order: Optional Scryfall ordering key.
|
||||
direction: Optional Scryfall sort direction.
|
||||
|
||||
Yields:
|
||||
dict: Card payloads from the result stream.
|
||||
"""
|
||||
self._mtg_scryfall_check_manager_access()
|
||||
query_params = {
|
||||
"q": (query or "").strip(),
|
||||
"unique": unique,
|
||||
}
|
||||
if include_multilingual:
|
||||
query_params["include_multilingual"] = "true"
|
||||
if order:
|
||||
query_params["order"] = order
|
||||
if direction:
|
||||
query_params["dir"] = direction
|
||||
next_url = self._build_api_url("/cards/search", query_params)
|
||||
remaining_limit = int(limit) if limit and int(limit) > 0 else None
|
||||
|
||||
while next_url:
|
||||
page_payload = self._request_json(next_url)
|
||||
page_card_payloads = [
|
||||
card_payload
|
||||
for card_payload in page_payload.get("data", [])
|
||||
if card_payload.get("object") == "card" and card_payload.get("id")
|
||||
]
|
||||
if remaining_limit is not None:
|
||||
page_card_payloads = page_card_payloads[:remaining_limit]
|
||||
|
||||
for card_payload in page_card_payloads:
|
||||
yield card_payload
|
||||
|
||||
if remaining_limit is not None:
|
||||
remaining_limit -= len(page_card_payloads)
|
||||
if remaining_limit <= 0:
|
||||
break
|
||||
if not page_payload.get("has_more"):
|
||||
break
|
||||
next_url = page_payload.get("next_page") or ""
|
||||
|
||||
def search_cards(self, query, *, unique="cards", limit=None, include_multilingual=False):
|
||||
"""Search card payloads on Scryfall with optional pagination.
|
||||
|
||||
Args:
|
||||
query: Raw Scryfall search query.
|
||||
unique: Requested Scryfall uniqueness mode.
|
||||
limit: Optional maximum number of returned payloads.
|
||||
include_multilingual: Whether Scryfall should include multilingual prints.
|
||||
|
||||
Returns:
|
||||
list[dict]: Card payloads returned by the Scryfall search API.
|
||||
"""
|
||||
return list(
|
||||
self.iter_search_card_payloads(
|
||||
query,
|
||||
unique=unique,
|
||||
limit=limit,
|
||||
include_multilingual=include_multilingual,
|
||||
)
|
||||
)
|
||||
|
||||
def fetch_search_results_by_url(self, search_url, *, limit=None):
|
||||
"""Fetch paginated Scryfall search results from an absolute URL.
|
||||
|
||||
Args:
|
||||
search_url: Absolute search URL such as ``prints_search_uri``.
|
||||
limit: Optional maximum number of returned payloads.
|
||||
|
||||
Returns:
|
||||
list[dict]: Card payloads from the paginated result stream.
|
||||
"""
|
||||
self._mtg_scryfall_check_manager_access()
|
||||
if search_url:
|
||||
parsed_url = parse.urlsplit(search_url)
|
||||
query_params = parse.parse_qsl(parsed_url.query, keep_blank_values=True)
|
||||
query_params = [
|
||||
(key, value)
|
||||
for key, value in query_params
|
||||
if key != "include_multilingual"
|
||||
]
|
||||
query_params.append(("include_multilingual", "true"))
|
||||
search_url = parse.urlunsplit(
|
||||
(
|
||||
parsed_url.scheme,
|
||||
parsed_url.netloc,
|
||||
parsed_url.path,
|
||||
parse.urlencode(query_params),
|
||||
parsed_url.fragment,
|
||||
)
|
||||
)
|
||||
|
||||
next_url = (search_url or "").strip()
|
||||
remaining_limit = int(limit) if limit and int(limit) > 0 else None
|
||||
payloads = []
|
||||
while next_url:
|
||||
page_payload = self._request_json(next_url)
|
||||
page_card_payloads = [
|
||||
card_payload
|
||||
for card_payload in page_payload.get("data", [])
|
||||
if card_payload.get("object") == "card" and card_payload.get("id")
|
||||
]
|
||||
if remaining_limit is not None:
|
||||
page_card_payloads = page_card_payloads[:remaining_limit]
|
||||
payloads.extend(page_card_payloads)
|
||||
if remaining_limit is not None:
|
||||
remaining_limit -= len(page_card_payloads)
|
||||
if remaining_limit <= 0:
|
||||
break
|
||||
if not page_payload.get("has_more"):
|
||||
break
|
||||
next_url = page_payload.get("next_page") or ""
|
||||
return payloads
|
||||
|
||||
def get_localized_print_payloads(self, payload, *, languages=None):
|
||||
"""Return localized payloads for one MTG print group.
|
||||
|
||||
Args:
|
||||
payload: Seed Scryfall card payload.
|
||||
languages: Optional iterable of Scryfall language codes.
|
||||
|
||||
Returns:
|
||||
list[dict]: Deduplicated payloads for the same set and collector number.
|
||||
"""
|
||||
if payload.get("object") != "card" or not payload.get("id"):
|
||||
return []
|
||||
|
||||
import_language_codes = normalize_language_codes(
|
||||
languages,
|
||||
default=self.get_import_language_codes(),
|
||||
)
|
||||
wanted_languages = set(import_language_codes)
|
||||
same_print_payloads = [payload]
|
||||
search_url = payload.get("prints_search_uri")
|
||||
if search_url:
|
||||
same_print_payloads.extend(
|
||||
candidate_payload
|
||||
for candidate_payload in self.fetch_search_results_by_url(search_url)
|
||||
if (
|
||||
candidate_payload.get("set") == payload.get("set")
|
||||
and candidate_payload.get("collector_number")
|
||||
== payload.get("collector_number")
|
||||
)
|
||||
)
|
||||
|
||||
deduplicated_payloads = {}
|
||||
for candidate_payload in same_print_payloads:
|
||||
candidate_id = candidate_payload.get("id")
|
||||
candidate_language = (candidate_payload.get("lang") or "").strip().lower()
|
||||
if not candidate_id:
|
||||
continue
|
||||
if wanted_languages and candidate_language and candidate_language not in wanted_languages:
|
||||
continue
|
||||
deduplicated_payloads[candidate_id] = candidate_payload
|
||||
|
||||
language_sort_order = {
|
||||
code: index
|
||||
for index, code in enumerate(import_language_codes)
|
||||
}
|
||||
return sorted(
|
||||
deduplicated_payloads.values(),
|
||||
key=lambda candidate_payload: (
|
||||
language_sort_order.get(
|
||||
(candidate_payload.get("lang") or "").strip().lower(),
|
||||
999,
|
||||
),
|
||||
candidate_payload.get("lang") or "",
|
||||
candidate_payload.get("id") or "",
|
||||
),
|
||||
)
|
||||
|
||||
def expand_payloads_with_localized_prints(self, payloads, *, languages=None):
|
||||
"""Expand seed payloads to localized print payloads.
|
||||
|
||||
Args:
|
||||
payloads: Seed Scryfall card payloads.
|
||||
languages: Optional iterable of Scryfall language codes.
|
||||
|
||||
Returns:
|
||||
list[dict]: Deduplicated localized payloads.
|
||||
"""
|
||||
expanded_payloads = {}
|
||||
for payload in payloads:
|
||||
for localized_payload in self.get_localized_print_payloads(
|
||||
payload,
|
||||
languages=languages,
|
||||
):
|
||||
expanded_payloads[localized_payload["id"]] = localized_payload
|
||||
return list(expanded_payloads.values())
|
||||
|
||||
def resolve_lookup_payload(self, query, lookup_mode):
|
||||
"""Resolve one Scryfall lookup input to a seed payload."""
|
||||
return self.lookup_card_payload(query, lookup_mode)
|
||||
|
||||
def resolve_lookup_payloads(self, query, lookup_mode, languages=None):
|
||||
"""Resolve one lookup query and expand it to localized payloads."""
|
||||
return self.lookup_card_payloads(query, lookup_mode, languages=languages)
|
||||
|
||||
def parse_batch_queries(self, raw_batch_input, default_lookup_mode="url"):
|
||||
"""Parse a multiline batch input into normalized lookup entries.
|
||||
|
||||
Each non-empty line is treated as one lookup item. Lines may optionally
|
||||
start with ``url:``, ``exact:`` or ``fuzzy:`` to override the default
|
||||
lookup mode for that line.
|
||||
|
||||
Args:
|
||||
raw_batch_input: Multiline raw text from the import wizard.
|
||||
default_lookup_mode: Fallback lookup mode for plain lines.
|
||||
|
||||
Returns:
|
||||
list[dict[str, str]]: Cleaned batch entries with ``query`` and
|
||||
``lookup_mode`` keys.
|
||||
"""
|
||||
normalized_default_mode = self._normalize_lookup_mode(default_lookup_mode)
|
||||
parsed_queries = []
|
||||
for raw_line in (raw_batch_input or "").splitlines():
|
||||
normalized_line = raw_line.strip()
|
||||
if not normalized_line or normalized_line.startswith("#"):
|
||||
continue
|
||||
|
||||
lookup_mode = normalized_default_mode
|
||||
query = normalized_line
|
||||
if ":" in normalized_line:
|
||||
prefix, suffix = normalized_line.split(":", 1)
|
||||
normalized_prefix = prefix.strip().lower()
|
||||
if normalized_prefix in SCRYFALL_LOOKUP_MODES and suffix.strip():
|
||||
lookup_mode = normalized_prefix
|
||||
query = suffix.strip()
|
||||
|
||||
parsed_queries.append(
|
||||
{
|
||||
"lookup_mode": lookup_mode,
|
||||
"query": query,
|
||||
}
|
||||
)
|
||||
return parsed_queries
|
||||
|
||||
def parse_set_codes(self, raw_text):
|
||||
"""Parse one or many MTG set codes from free-text input.
|
||||
|
||||
Args:
|
||||
raw_text: Multiline or comma-separated set-code input.
|
||||
|
||||
Returns:
|
||||
list[str]: Deduplicated lower-case set codes.
|
||||
"""
|
||||
set_codes = []
|
||||
for chunk in re.split(r"[\s,;]+", raw_text or ""):
|
||||
set_code = (chunk or "").strip().lower()
|
||||
if set_code and set_code not in set_codes:
|
||||
set_codes.append(set_code)
|
||||
return set_codes
|
||||
|
||||
def fetch_set_seed_payloads(self, set_code, *, card_limit=0, include_tokens=False):
|
||||
"""Fetch seed payloads for one MTG set code.
|
||||
|
||||
Args:
|
||||
set_code: Scryfall set code such as ``tdm``.
|
||||
card_limit: Optional maximum number of returned cards.
|
||||
include_tokens: Whether token cards should be included.
|
||||
|
||||
Returns:
|
||||
list[dict]: Seed payloads for the requested set import.
|
||||
"""
|
||||
normalized_set_code = (set_code or "").strip().lower()
|
||||
if not normalized_set_code:
|
||||
raise UserError(_("Enter a set code first."))
|
||||
|
||||
query_terms = [f"e:{normalized_set_code}", "game:paper"]
|
||||
if not include_tokens:
|
||||
query_terms.append("-is:token")
|
||||
|
||||
seed_payloads = list(
|
||||
self.iter_search_card_payloads(
|
||||
" ".join(query_terms),
|
||||
unique="prints",
|
||||
limit=card_limit or None,
|
||||
order="set",
|
||||
direction="asc",
|
||||
)
|
||||
)
|
||||
if not seed_payloads:
|
||||
raise UserError(
|
||||
_("Scryfall returned no cards for the set code %s.")
|
||||
% normalized_set_code.upper()
|
||||
)
|
||||
return seed_payloads
|
||||
|
||||
def get_set_print_group_payloads(
|
||||
self,
|
||||
set_code,
|
||||
*,
|
||||
languages=None,
|
||||
limit=None,
|
||||
include_tokens=False,
|
||||
):
|
||||
"""Return localized payload groups for one MTG set import.
|
||||
|
||||
Args:
|
||||
set_code: Raw MTG set code.
|
||||
languages: Optional iterable of wanted Scryfall language codes.
|
||||
limit: Optional maximum number of imported print groups.
|
||||
include_tokens: Whether token cards should be included.
|
||||
|
||||
Returns:
|
||||
list[list[dict]]: Localized payload groups keyed by print reference.
|
||||
"""
|
||||
import_language_codes = normalize_language_codes(
|
||||
languages,
|
||||
default=self.get_import_language_codes(),
|
||||
)
|
||||
wanted_languages = set(import_language_codes)
|
||||
grouped_payloads = {}
|
||||
card_model = self.env["mvd.tcg.card"]
|
||||
normalized_set_code = (set_code or "").strip().lower()
|
||||
if not normalized_set_code:
|
||||
raise UserError(_("Enter a set code first."))
|
||||
|
||||
query_terms = [f"e:{normalized_set_code}", "game:paper"]
|
||||
if not include_tokens:
|
||||
query_terms.append("-is:token")
|
||||
|
||||
for payload in self.iter_search_card_payloads(
|
||||
" ".join(query_terms),
|
||||
unique="prints",
|
||||
include_multilingual=True,
|
||||
order="set",
|
||||
direction="asc",
|
||||
):
|
||||
external_ref = card_model._mtg_scryfall_build_external_ref(payload)
|
||||
if not external_ref or external_ref == ":":
|
||||
continue
|
||||
payload_language = (payload.get("lang") or "").strip().lower()
|
||||
if wanted_languages and payload_language and payload_language not in wanted_languages:
|
||||
continue
|
||||
grouped_payloads.setdefault(external_ref, {})[payload.get("id")] = payload
|
||||
|
||||
sorted_group_payloads = []
|
||||
language_sort_order = {
|
||||
code: index
|
||||
for index, code in enumerate(import_language_codes)
|
||||
}
|
||||
for external_ref in sorted(grouped_payloads):
|
||||
localized_payloads = sorted(
|
||||
grouped_payloads[external_ref].values(),
|
||||
key=lambda candidate_payload: (
|
||||
language_sort_order.get(
|
||||
(candidate_payload.get("lang") or "").strip().lower(),
|
||||
999,
|
||||
),
|
||||
candidate_payload.get("lang") or "",
|
||||
candidate_payload.get("id") or "",
|
||||
),
|
||||
)
|
||||
if localized_payloads:
|
||||
sorted_group_payloads.append(localized_payloads)
|
||||
if limit and len(sorted_group_payloads) >= int(limit):
|
||||
break
|
||||
|
||||
if not sorted_group_payloads:
|
||||
raise UserError(
|
||||
_("Scryfall returned no cards for the set code %s.")
|
||||
% normalized_set_code.upper()
|
||||
)
|
||||
return sorted_group_payloads
|
||||
514
models/mvd_tcg_mtg_scryfall_import_run.py
Normal file
514
models/mvd_tcg_mtg_scryfall_import_run.py
Normal file
@@ -0,0 +1,514 @@
|
||||
"""Controlled set and batch import runs for the MTG Scryfall connector."""
|
||||
|
||||
from odoo import _, api, fields, models
|
||||
from odoo.exceptions import UserError
|
||||
|
||||
from .mvd_tcg_mtg_scryfall_api import normalize_language_codes
|
||||
|
||||
|
||||
class MvdTcgMtgScryfallImportRun(models.Model):
|
||||
"""Track controlled MTG Scryfall set and batch imports."""
|
||||
|
||||
_name = "mvd.tcg.mtg.scryfall.import.run"
|
||||
_description = "MTG Scryfall Import Run"
|
||||
_order = "id desc"
|
||||
|
||||
name = fields.Char(required=True, readonly=True, copy=False)
|
||||
import_mode = fields.Selection(
|
||||
selection=[
|
||||
("set", "Set Import"),
|
||||
("batch", "Batch Import"),
|
||||
],
|
||||
required=True,
|
||||
readonly=True,
|
||||
)
|
||||
lookup_mode = fields.Selection(
|
||||
selection=[
|
||||
("url", "Direct Scryfall URL or card id"),
|
||||
("exact", "Exact card name"),
|
||||
("fuzzy", "Fuzzy card name"),
|
||||
],
|
||||
readonly=True,
|
||||
)
|
||||
state = fields.Selection(
|
||||
selection=[
|
||||
("queued", "Queued"),
|
||||
("running", "Running"),
|
||||
("done", "Done"),
|
||||
("failed", "Failed"),
|
||||
],
|
||||
default="queued",
|
||||
readonly=True,
|
||||
required=True,
|
||||
)
|
||||
stage = fields.Selection(
|
||||
selection=[
|
||||
("queued", "Queued"),
|
||||
("prepare", "Preparing"),
|
||||
("fetch", "Fetching"),
|
||||
("import", "Importing"),
|
||||
("finished", "Finished"),
|
||||
("failed", "Failed"),
|
||||
],
|
||||
default="queued",
|
||||
readonly=True,
|
||||
)
|
||||
requested_by_id = fields.Many2one(
|
||||
"res.users",
|
||||
string="Requested By",
|
||||
readonly=True,
|
||||
default=lambda self: self.env.user,
|
||||
)
|
||||
started_at = fields.Datetime(readonly=True)
|
||||
finished_at = fields.Datetime(readonly=True)
|
||||
input_text = fields.Text(readonly=True)
|
||||
language_codes = fields.Char(readonly=True)
|
||||
max_cards_per_set = fields.Integer(readonly=True)
|
||||
include_tokens = fields.Boolean(readonly=True)
|
||||
progress_current = fields.Integer(readonly=True)
|
||||
progress_total = fields.Integer(readonly=True)
|
||||
progress_percent = fields.Float(readonly=True)
|
||||
set_count = fields.Integer(readonly=True)
|
||||
item_count = fields.Integer(readonly=True)
|
||||
cards_created = fields.Integer(readonly=True)
|
||||
cards_updated = fields.Integer(readonly=True)
|
||||
sets_created = fields.Integer(readonly=True)
|
||||
sets_updated = fields.Integer(readonly=True)
|
||||
log_text = fields.Text(readonly=True)
|
||||
error_message = fields.Text(readonly=True)
|
||||
|
||||
@api.model_create_multi
|
||||
def create(self, vals_list):
|
||||
"""Create import runs with generated display names when needed.
|
||||
|
||||
Args:
|
||||
vals_list: Raw values for one or many import runs.
|
||||
|
||||
Returns:
|
||||
mvd.tcg.mtg.scryfall.import.run: Created import runs.
|
||||
"""
|
||||
timestamp = fields.Datetime.now()
|
||||
enriched_vals_list = []
|
||||
for vals in vals_list:
|
||||
enriched_vals = dict(vals)
|
||||
if not enriched_vals.get("name"):
|
||||
enriched_vals["name"] = _("MTG Scryfall Import %s") % timestamp
|
||||
enriched_vals_list.append(enriched_vals)
|
||||
return super().create(enriched_vals_list)
|
||||
|
||||
@api.model
|
||||
def create_card_refresh_run(self, card, *, language_codes=None):
|
||||
"""Create one refresh run for a single existing MTG card.
|
||||
|
||||
Args:
|
||||
card: Existing MTG card to refresh from Scryfall.
|
||||
language_codes: Optional comma-separated or iterable language scope.
|
||||
|
||||
Returns:
|
||||
mvd.tcg.mtg.scryfall.import.run: Prepared batch-style refresh run.
|
||||
"""
|
||||
card.ensure_one()
|
||||
self.env["mvd.tcg.mtg.scryfall.api"]._mtg_scryfall_check_manager_access()
|
||||
refresh_query = card._mtg_scryfall_get_refresh_query()
|
||||
if not refresh_query:
|
||||
raise UserError(_("This card cannot be refreshed from Scryfall yet."))
|
||||
|
||||
resolved_languages = normalize_language_codes(
|
||||
language_codes,
|
||||
default=self.env["mvd.tcg.mtg.scryfall.api"].get_import_language_codes(),
|
||||
)
|
||||
return self.create(
|
||||
{
|
||||
"name": _("Refresh %s") % card.display_name,
|
||||
"import_mode": "batch",
|
||||
"lookup_mode": "url",
|
||||
"input_text": refresh_query,
|
||||
"language_codes": ",".join(resolved_languages),
|
||||
}
|
||||
)
|
||||
|
||||
@api.model
|
||||
def create_set_refresh_run(
|
||||
self,
|
||||
mtg_set,
|
||||
*,
|
||||
language_codes=None,
|
||||
max_cards_per_set=0,
|
||||
include_tokens=False,
|
||||
):
|
||||
"""Create one refresh run for a single existing MTG set.
|
||||
|
||||
Args:
|
||||
mtg_set: Existing MTG set to refresh from Scryfall.
|
||||
language_codes: Optional comma-separated or iterable language scope.
|
||||
max_cards_per_set: Optional limit for the refreshed print groups.
|
||||
include_tokens: Whether token cards should be included.
|
||||
|
||||
Returns:
|
||||
mvd.tcg.mtg.scryfall.import.run: Prepared set refresh run.
|
||||
"""
|
||||
mtg_set.ensure_one()
|
||||
self.env["mvd.tcg.mtg.scryfall.api"]._mtg_scryfall_check_manager_access()
|
||||
if not mtg_set.code:
|
||||
raise UserError(_("This set has no MTG set code yet."))
|
||||
|
||||
resolved_languages = normalize_language_codes(
|
||||
language_codes,
|
||||
default=self.env["mvd.tcg.mtg.scryfall.api"].get_import_language_codes(),
|
||||
)
|
||||
return self.create(
|
||||
{
|
||||
"name": _("Refresh Set %s") % mtg_set.code.upper(),
|
||||
"import_mode": "set",
|
||||
"input_text": mtg_set.code,
|
||||
"language_codes": ",".join(resolved_languages),
|
||||
"max_cards_per_set": max_cards_per_set or 0,
|
||||
"include_tokens": include_tokens,
|
||||
}
|
||||
)
|
||||
|
||||
def _action_open_run(self):
|
||||
"""Open the current import run in form view.
|
||||
|
||||
Returns:
|
||||
dict: Window action for the current run.
|
||||
"""
|
||||
self.ensure_one()
|
||||
return {
|
||||
"type": "ir.actions.act_window",
|
||||
"name": _("Scryfall Import Run"),
|
||||
"res_model": "mvd.tcg.mtg.scryfall.import.run",
|
||||
"view_mode": "form",
|
||||
"res_id": self.id,
|
||||
"target": "current",
|
||||
}
|
||||
|
||||
def _write_progress(self, **values):
|
||||
"""Write progress fields and recompute the progress percentage.
|
||||
|
||||
Args:
|
||||
**values: Run field values to write.
|
||||
"""
|
||||
self.ensure_one()
|
||||
current = values.get("progress_current", self.progress_current or 0)
|
||||
total = values.get("progress_total", self.progress_total or 0)
|
||||
percent = 0.0
|
||||
if total:
|
||||
percent = min(100.0, round((current / total) * 100.0, 2))
|
||||
values["progress_percent"] = percent
|
||||
self.write(values)
|
||||
|
||||
def _append_log(self, message):
|
||||
"""Append one timestamped line to the run log.
|
||||
|
||||
Args:
|
||||
message: Human-readable log message.
|
||||
"""
|
||||
self.ensure_one()
|
||||
timestamp = fields.Datetime.now()
|
||||
line = f"[{timestamp}] {message}"
|
||||
self.write(
|
||||
{
|
||||
"log_text": ((self.log_text or "") + ("\n" if self.log_text else "") + line),
|
||||
}
|
||||
)
|
||||
|
||||
def _normalize_run_languages(self):
|
||||
"""Return the normalized language scope for the current run.
|
||||
|
||||
Returns:
|
||||
tuple[str, ...]: Deduplicated Scryfall language codes.
|
||||
"""
|
||||
self.ensure_one()
|
||||
return normalize_language_codes(self.language_codes)
|
||||
|
||||
def _prepare_batch_entries(self):
|
||||
"""Return normalized batch entries for the current run.
|
||||
|
||||
Returns:
|
||||
list[dict[str, str]]: Parsed batch entries.
|
||||
"""
|
||||
self.ensure_one()
|
||||
return self.env["mvd.tcg.mtg.scryfall.api"].parse_batch_queries(
|
||||
self.input_text,
|
||||
default_lookup_mode=self.lookup_mode or "url",
|
||||
)
|
||||
|
||||
def _prepare_set_codes(self):
|
||||
"""Return normalized set codes for the current run.
|
||||
|
||||
Returns:
|
||||
list[str]: Parsed set codes.
|
||||
"""
|
||||
self.ensure_one()
|
||||
return self.env["mvd.tcg.mtg.scryfall.api"].parse_set_codes(self.input_text)
|
||||
|
||||
def _count_card_operation(self, primary_payload, cards_created, cards_updated):
|
||||
"""Update card counters for one upsert candidate.
|
||||
|
||||
Args:
|
||||
primary_payload: Primary payload of the imported print group.
|
||||
cards_created: Current created counter.
|
||||
cards_updated: Current updated counter.
|
||||
|
||||
Returns:
|
||||
tuple[int, int]: Updated card counters.
|
||||
"""
|
||||
self.ensure_one()
|
||||
card_model = self.env["mvd.tcg.card"]
|
||||
external_ref = card_model._mtg_scryfall_build_external_ref(primary_payload)
|
||||
mtg_game = self.env["mvd.tcg.game"]._mvd_tcg_get_mtg_game()
|
||||
existing_card = card_model.search(
|
||||
[("game_id", "=", mtg_game.id), ("external_ref", "=", external_ref)],
|
||||
limit=1,
|
||||
)
|
||||
if existing_card:
|
||||
cards_updated += 1
|
||||
else:
|
||||
cards_created += 1
|
||||
return cards_created, cards_updated
|
||||
|
||||
def _count_set_operation(self, primary_payload, seen_set_codes, sets_created, sets_updated):
|
||||
"""Update set counters for one upsert candidate.
|
||||
|
||||
Args:
|
||||
primary_payload: Primary payload of the imported print group.
|
||||
seen_set_codes: Set codes already counted in the current run.
|
||||
sets_created: Current created counter.
|
||||
sets_updated: Current updated counter.
|
||||
|
||||
Returns:
|
||||
tuple[set[str], int, int]: Updated set-code cache and counters.
|
||||
"""
|
||||
self.ensure_one()
|
||||
set_code = (primary_payload.get("set") or "").strip().lower()
|
||||
if not set_code or set_code in seen_set_codes:
|
||||
return seen_set_codes, sets_created, sets_updated
|
||||
|
||||
mtg_game = self.env["mvd.tcg.game"]._mvd_tcg_get_mtg_game()
|
||||
existing_set = self.env["mvd.tcg.mtg.set"].search(
|
||||
[("game_id", "=", mtg_game.id), ("code", "=", set_code)],
|
||||
limit=1,
|
||||
)
|
||||
if existing_set:
|
||||
sets_updated += 1
|
||||
else:
|
||||
sets_created += 1
|
||||
seen_set_codes.add(set_code)
|
||||
return seen_set_codes, sets_created, sets_updated
|
||||
|
||||
def _import_payload_groups(self, grouped_payloads, *, label_getter):
|
||||
"""Import grouped payloads and update run counters.
|
||||
|
||||
Args:
|
||||
grouped_payloads: Localized payload groups to import.
|
||||
label_getter: Callable returning a human-readable item label.
|
||||
"""
|
||||
self.ensure_one()
|
||||
card_model = self.env["mvd.tcg.card"]
|
||||
cards_created = 0
|
||||
cards_updated = 0
|
||||
sets_created = 0
|
||||
sets_updated = 0
|
||||
seen_set_codes = set()
|
||||
total_groups = len(grouped_payloads)
|
||||
|
||||
self._write_progress(
|
||||
stage="import",
|
||||
progress_current=0,
|
||||
progress_total=max(total_groups, 1),
|
||||
item_count=total_groups,
|
||||
set_count=0,
|
||||
cards_created=0,
|
||||
cards_updated=0,
|
||||
sets_created=0,
|
||||
sets_updated=0,
|
||||
)
|
||||
for index, payloads in enumerate(grouped_payloads, start=1):
|
||||
primary_payload = card_model._mtg_scryfall_select_primary_payload(payloads)
|
||||
cards_created, cards_updated = self._count_card_operation(
|
||||
primary_payload,
|
||||
cards_created,
|
||||
cards_updated,
|
||||
)
|
||||
seen_set_codes, sets_created, sets_updated = self._count_set_operation(
|
||||
primary_payload,
|
||||
seen_set_codes,
|
||||
sets_created,
|
||||
sets_updated,
|
||||
)
|
||||
card_model.mtg_scryfall_upsert_group_from_payloads(
|
||||
payloads,
|
||||
import_run=self,
|
||||
)
|
||||
if index == 1 or index == total_groups or index % 25 == 0:
|
||||
self._append_log(
|
||||
_("Imported %s (%s/%s).")
|
||||
% (
|
||||
label_getter(primary_payload),
|
||||
index,
|
||||
total_groups,
|
||||
)
|
||||
)
|
||||
self._write_progress(
|
||||
stage="import",
|
||||
progress_current=index,
|
||||
progress_total=max(total_groups, 1),
|
||||
set_count=len(seen_set_codes),
|
||||
cards_created=cards_created,
|
||||
cards_updated=cards_updated,
|
||||
sets_created=sets_created,
|
||||
sets_updated=sets_updated,
|
||||
)
|
||||
|
||||
def _run_set_import(self):
|
||||
"""Execute one controlled MTG set import."""
|
||||
self.ensure_one()
|
||||
scryfall_api = self.env["mvd.tcg.mtg.scryfall.api"]
|
||||
set_codes = self._prepare_set_codes()
|
||||
if not set_codes:
|
||||
raise UserError(_("Enter at least one valid set code first."))
|
||||
languages = self._normalize_run_languages()
|
||||
max_cards_per_set = self.max_cards_per_set or None
|
||||
grouped_payloads = []
|
||||
|
||||
self._write_progress(stage="fetch", set_count=len(set_codes))
|
||||
for set_code in set_codes:
|
||||
self._append_log(_("Fetching set %s from Scryfall.") % set_code.upper())
|
||||
set_groups = scryfall_api.get_set_print_group_payloads(
|
||||
set_code,
|
||||
languages=languages,
|
||||
limit=max_cards_per_set,
|
||||
include_tokens=self.include_tokens,
|
||||
)
|
||||
self._append_log(
|
||||
_("Fetched %s print groups for set %s.")
|
||||
% (len(set_groups), set_code.upper())
|
||||
)
|
||||
grouped_payloads.extend(set_groups)
|
||||
|
||||
self._import_payload_groups(
|
||||
grouped_payloads,
|
||||
label_getter=lambda primary_payload: "%s #%s"
|
||||
% (
|
||||
(primary_payload.get("set") or "").upper(),
|
||||
primary_payload.get("collector_number") or "?",
|
||||
),
|
||||
)
|
||||
|
||||
def _run_batch_import(self):
|
||||
"""Execute one controlled MTG batch import."""
|
||||
self.ensure_one()
|
||||
scryfall_api = self.env["mvd.tcg.mtg.scryfall.api"]
|
||||
batch_entries = self._prepare_batch_entries()
|
||||
if not batch_entries:
|
||||
raise UserError(_("Enter at least one valid batch lookup first."))
|
||||
languages = self._normalize_run_languages()
|
||||
grouped_payloads = []
|
||||
|
||||
self._write_progress(
|
||||
stage="fetch",
|
||||
progress_current=0,
|
||||
progress_total=max(len(batch_entries), 1),
|
||||
item_count=len(batch_entries),
|
||||
)
|
||||
for index, entry in enumerate(batch_entries, start=1):
|
||||
self._append_log(
|
||||
_("Resolving %s lookup: %s")
|
||||
% (entry["lookup_mode"], entry["query"])
|
||||
)
|
||||
grouped_payloads.append(
|
||||
scryfall_api.lookup_card_payloads(
|
||||
entry["query"],
|
||||
entry["lookup_mode"],
|
||||
languages=languages,
|
||||
)
|
||||
)
|
||||
self._write_progress(
|
||||
stage="fetch",
|
||||
progress_current=index,
|
||||
progress_total=max(len(batch_entries), 1),
|
||||
)
|
||||
|
||||
self._import_payload_groups(
|
||||
grouped_payloads,
|
||||
label_getter=lambda primary_payload: primary_payload.get("name")
|
||||
or primary_payload.get("printed_name")
|
||||
or primary_payload.get("id")
|
||||
or _("Card"),
|
||||
)
|
||||
|
||||
def action_execute_import(self):
|
||||
"""Run the current import and open the final run record.
|
||||
|
||||
Returns:
|
||||
dict: Form action for the current import run.
|
||||
"""
|
||||
self.ensure_one()
|
||||
self.env["mvd.tcg.mtg.scryfall.api"]._mtg_scryfall_check_manager_access()
|
||||
|
||||
self._write_progress(
|
||||
state="running",
|
||||
stage="prepare",
|
||||
started_at=fields.Datetime.now(),
|
||||
finished_at=False,
|
||||
error_message=False,
|
||||
log_text=False,
|
||||
progress_current=0,
|
||||
progress_total=0,
|
||||
item_count=0,
|
||||
set_count=0,
|
||||
cards_created=0,
|
||||
cards_updated=0,
|
||||
sets_created=0,
|
||||
sets_updated=0,
|
||||
)
|
||||
self._append_log(_("Started %s.") % dict(self._fields["import_mode"].selection).get(self.import_mode))
|
||||
|
||||
try:
|
||||
if self.import_mode == "set":
|
||||
self._run_set_import()
|
||||
else:
|
||||
self._run_batch_import()
|
||||
self._write_progress(
|
||||
state="done",
|
||||
stage="finished",
|
||||
finished_at=fields.Datetime.now(),
|
||||
)
|
||||
self._append_log(_("Finished import successfully."))
|
||||
except Exception as exc: # noqa: BLE001
|
||||
self._append_log(_("Import failed: %s") % exc)
|
||||
self._write_progress(
|
||||
state="failed",
|
||||
stage="failed",
|
||||
finished_at=fields.Datetime.now(),
|
||||
error_message=str(exc),
|
||||
)
|
||||
|
||||
return self._action_open_run()
|
||||
|
||||
def action_open_cards(self):
|
||||
"""Open MTG cards imported by the current run.
|
||||
|
||||
Returns:
|
||||
dict: Window action filtered on imported cards.
|
||||
"""
|
||||
self.ensure_one()
|
||||
action = self.env["ir.actions.actions"]._for_xml_id(
|
||||
"mvd_tcg_mtg.mvd_tcg_mtg_card_action"
|
||||
)
|
||||
action["domain"] = [("mtg_scryfall_last_import_run_id", "=", self.id)]
|
||||
return action
|
||||
|
||||
def action_open_sets(self):
|
||||
"""Open MTG sets touched by the current run.
|
||||
|
||||
Returns:
|
||||
dict: Window action filtered on imported sets.
|
||||
"""
|
||||
self.ensure_one()
|
||||
action = self.env["ir.actions.actions"]._for_xml_id(
|
||||
"mvd_tcg_mtg.mvd_tcg_mtg_set_action"
|
||||
)
|
||||
action["domain"] = [("mtg_scryfall_last_import_run_id", "=", self.id)]
|
||||
return action
|
||||
98
models/mvd_tcg_mtg_set.py
Normal file
98
models/mvd_tcg_mtg_set.py
Normal file
@@ -0,0 +1,98 @@
|
||||
"""Scryfall-specific MTG set helpers for the MTG connector."""
|
||||
|
||||
from odoo import api, fields, models
|
||||
|
||||
|
||||
class MvdTcgMtgSet(models.Model):
|
||||
"""Extend MTG sets with Scryfall import helpers."""
|
||||
|
||||
_inherit = "mvd.tcg.mtg.set"
|
||||
|
||||
mtg_scryfall_last_import_run_id = fields.Many2one(
|
||||
"mvd.tcg.mtg.scryfall.import.run",
|
||||
copy=False,
|
||||
index=True,
|
||||
ondelete="set null",
|
||||
readonly=True,
|
||||
)
|
||||
mtg_scryfall_last_synced_at = fields.Datetime(
|
||||
copy=False,
|
||||
readonly=True,
|
||||
)
|
||||
|
||||
def action_open_mtg_scryfall_last_import_run(self):
|
||||
"""Open the most recent Scryfall import run linked to this set.
|
||||
|
||||
Returns:
|
||||
dict | bool: Window action or ``False`` when no run is linked.
|
||||
"""
|
||||
self.ensure_one()
|
||||
if not self.mtg_scryfall_last_import_run_id:
|
||||
return False
|
||||
|
||||
return {
|
||||
"type": "ir.actions.act_window",
|
||||
"name": self.mtg_scryfall_last_import_run_id.display_name,
|
||||
"res_model": "mvd.tcg.mtg.scryfall.import.run",
|
||||
"view_mode": "form",
|
||||
"res_id": self.mtg_scryfall_last_import_run_id.id,
|
||||
"target": "current",
|
||||
}
|
||||
|
||||
def action_mtg_scryfall_refresh_set(self):
|
||||
"""Refresh the current MTG set from the Scryfall connector.
|
||||
|
||||
Returns:
|
||||
dict: Window action for the created refresh run.
|
||||
"""
|
||||
self.ensure_one()
|
||||
self.env["mvd.tcg.mtg.scryfall.api"]._mtg_scryfall_check_manager_access()
|
||||
refresh_run = self.env["mvd.tcg.mtg.scryfall.import.run"].create_set_refresh_run(
|
||||
self,
|
||||
language_codes=self.env["mvd.tcg.mtg.scryfall.api"].get_import_language_codes(),
|
||||
max_cards_per_set=self.env["mvd.tcg.mtg.scryfall.api"].get_default_max_cards_per_set(),
|
||||
include_tokens=self.env["mvd.tcg.mtg.scryfall.api"].get_default_include_tokens(),
|
||||
)
|
||||
return refresh_run.action_execute_import()
|
||||
|
||||
@api.model
|
||||
def mtg_scryfall_upsert_from_payload(self, payload, import_run=None):
|
||||
"""Create or update one MTG set from a Scryfall card payload.
|
||||
|
||||
Args:
|
||||
payload: Raw Scryfall card payload containing set metadata.
|
||||
import_run: Optional import run record for traceability.
|
||||
|
||||
Returns:
|
||||
mvd.tcg.mtg.set: Upserted MTG set record.
|
||||
"""
|
||||
set_code = (payload.get("set") or "").strip().lower()
|
||||
if not set_code:
|
||||
return self.browse()
|
||||
|
||||
mtg_game = self.env["mvd.tcg.game"]._mvd_tcg_get_mtg_game()
|
||||
values = {
|
||||
"active": True,
|
||||
"code": set_code,
|
||||
"game_id": mtg_game.id,
|
||||
"name": payload.get("set_name") or set_code.upper(),
|
||||
"released_on": payload.get("released_at") or False,
|
||||
"set_type": payload.get("set_type") or False,
|
||||
"official_card_count": payload.get("set_card_count") or 0,
|
||||
"icon_svg_uri": payload.get("set_uri") or payload.get("scryfall_set_uri") or False,
|
||||
}
|
||||
if import_run:
|
||||
values.update(
|
||||
{
|
||||
"mtg_scryfall_last_import_run_id": import_run.id,
|
||||
"mtg_scryfall_last_synced_at": fields.Datetime.now(),
|
||||
}
|
||||
)
|
||||
mtg_set = self.search(
|
||||
[("game_id", "=", mtg_game.id), ("code", "=", set_code)],
|
||||
limit=1,
|
||||
)
|
||||
if mtg_set:
|
||||
mtg_set.write(values)
|
||||
return mtg_set
|
||||
return self.create(values)
|
||||
59
models/res_config_settings.py
Normal file
59
models/res_config_settings.py
Normal file
@@ -0,0 +1,59 @@
|
||||
"""Settings for the MTG Scryfall connector."""
|
||||
|
||||
from odoo import api, fields, models
|
||||
from odoo.exceptions import UserError, ValidationError
|
||||
|
||||
from .constants import (
|
||||
DEFAULT_SCRYFALL_API_BASE_URL,
|
||||
DEFAULT_SCRYFALL_IMPORT_LANGUAGES,
|
||||
DEFAULT_SCRYFALL_TIMEOUT_SECONDS,
|
||||
DEFAULT_SCRYFALL_USER_AGENT,
|
||||
)
|
||||
|
||||
|
||||
class ResConfigSettings(models.TransientModel):
|
||||
"""Expose Scryfall connector defaults through Odoo settings."""
|
||||
|
||||
_inherit = "res.config.settings"
|
||||
|
||||
mtg_scryfall_api_base_url = fields.Char(
|
||||
string="Scryfall API Base URL",
|
||||
config_parameter="mvd_tcg_mtg_scryfall.api_base_url",
|
||||
default=DEFAULT_SCRYFALL_API_BASE_URL,
|
||||
)
|
||||
mtg_scryfall_timeout_seconds = fields.Integer(
|
||||
string="Scryfall Request Timeout (s)",
|
||||
config_parameter="mvd_tcg_mtg_scryfall.timeout_seconds",
|
||||
default=DEFAULT_SCRYFALL_TIMEOUT_SECONDS,
|
||||
)
|
||||
mtg_scryfall_user_agent = fields.Char(
|
||||
string="User Agent",
|
||||
config_parameter="mvd_tcg_mtg_scryfall.user_agent",
|
||||
default=DEFAULT_SCRYFALL_USER_AGENT,
|
||||
)
|
||||
mtg_scryfall_import_language_codes = fields.Char(
|
||||
string="Default Import Languages",
|
||||
config_parameter="mvd_tcg_mtg_scryfall.import_language_codes",
|
||||
default=",".join(DEFAULT_SCRYFALL_IMPORT_LANGUAGES),
|
||||
)
|
||||
mtg_scryfall_import_max_cards_per_set = fields.Integer(
|
||||
string="Default Card Limit per Set",
|
||||
config_parameter="mvd_tcg_mtg_scryfall.import_max_cards_per_set",
|
||||
default=0,
|
||||
)
|
||||
mtg_scryfall_import_include_tokens = fields.Boolean(
|
||||
string="Include Tokens by Default",
|
||||
config_parameter="mvd_tcg_mtg_scryfall.import_include_tokens",
|
||||
default=False,
|
||||
)
|
||||
|
||||
@api.constrains("mtg_scryfall_api_base_url")
|
||||
def _check_mtg_scryfall_api_base_url(self):
|
||||
"""Restrict the connector to trusted HTTPS Scryfall API hosts."""
|
||||
for settings in self:
|
||||
try:
|
||||
self.env["mvd.tcg.mtg.scryfall.api"]._validate_api_url(
|
||||
settings.mtg_scryfall_api_base_url
|
||||
)
|
||||
except UserError as exc:
|
||||
raise ValidationError(str(exc)) from exc
|
||||
Reference in New Issue
Block a user