Files
mvd_tcg_mtg_deck_openai/models/mvd_tcg_deck.py
2026-04-03 23:08:58 +02:00

2104 lines
83 KiB
Python

"""OpenAI-powered MTG deck analysis."""
import html
import json
import os
import time
from urllib import error, parse, request
from odoo import _, fields, models
from odoo.exceptions import UserError
from .constants import (
DEFAULT_MTG_OPENAI_API_BASE_URL,
DEFAULT_MTG_OPENAI_ALTERNATIVE_CANDIDATE_LIMIT,
DEFAULT_MTG_OPENAI_FILL_BATCH_SIZE,
DEFAULT_MTG_OPENAI_FILL_CANDIDATE_LIMIT,
DEFAULT_MTG_OPENAI_MODEL_NAME,
DEFAULT_MTG_OPENAI_REQUEST_TIMEOUT_SECONDS,
DEFAULT_MTG_OPENAI_ROLE_BATCH_SIZE,
)
class MvdTcgDeck(models.Model):
"""Extend MTG decks with on-demand OpenAI analysis fields."""
_inherit = "mvd.tcg.deck"
_MTG_OPENAI_ALLOWED_BASE_HOSTS = frozenset({"api.openai.com"})
_MTG_OPENAI_BATCH_SEPARATOR = "\n\n--- batch ---\n\n"
_MTG_OPENAI_STATE_SELECTION = [
("empty", "Not Analyzed"),
("done", "Ready"),
("failed", "Failed"),
]
_MTG_OPENAI_ROLE_BATCH_SIZE = DEFAULT_MTG_OPENAI_ROLE_BATCH_SIZE
_MTG_OPENAI_FILL_CANDIDATE_LIMIT = DEFAULT_MTG_OPENAI_FILL_CANDIDATE_LIMIT
_MTG_OPENAI_FILL_BATCH_SIZE = DEFAULT_MTG_OPENAI_FILL_BATCH_SIZE
_MTG_OPENAI_ALTERNATIVE_CANDIDATE_LIMIT = DEFAULT_MTG_OPENAI_ALTERNATIVE_CANDIDATE_LIMIT
_MTG_OPENAI_RUN_FIELD_MAP = {
"analysis": {
"state": "mtg_openai_analysis_state",
"error": "mtg_openai_last_error",
"analyzed_at": "mtg_openai_last_analyzed_at",
"model": "mtg_openai_last_model",
"lang": "mtg_openai_last_lang",
"prompt": "mtg_openai_last_prompt",
"response": "mtg_openai_last_response",
},
"role": {
"state": "mtg_openai_role_analysis_state",
"error": "mtg_openai_role_last_error",
"analyzed_at": "mtg_openai_role_last_analyzed_at",
"model": "mtg_openai_role_last_model",
"lang": "mtg_openai_role_last_lang",
"prompt": "mtg_openai_role_last_prompt",
"response": "mtg_openai_role_last_response",
},
"alternative": {
"state": "mtg_openai_alternative_state",
"error": "mtg_openai_alternative_last_error",
"analyzed_at": "mtg_openai_alternative_last_analyzed_at",
"model": "mtg_openai_alternative_last_model",
"lang": "mtg_openai_alternative_last_lang",
"prompt": "mtg_openai_alternative_last_prompt",
"response": "mtg_openai_alternative_last_response",
},
"fill": {
"state": "mtg_openai_fill_state",
"error": "mtg_openai_fill_last_error",
"analyzed_at": "mtg_openai_fill_last_analyzed_at",
"model": "mtg_openai_fill_last_model",
"lang": "mtg_openai_fill_last_lang",
"prompt": "mtg_openai_fill_last_prompt",
"response": "mtg_openai_fill_last_response",
},
}
mtg_openai_analysis_state = fields.Selection(
selection=_MTG_OPENAI_STATE_SELECTION,
string="Deck Analysis Status",
default="empty",
readonly=True,
copy=False,
)
mtg_openai_last_analyzed_at = fields.Datetime(
string="Last Deck Analysis",
readonly=True,
copy=False,
)
mtg_openai_last_model = fields.Char(
string="Analysis Model",
readonly=True,
copy=False,
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
)
mtg_openai_last_lang = fields.Char(
string="Analysis Language",
readonly=True,
copy=False,
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
)
mtg_openai_commander_summary = fields.Html(
string="Commander Summary",
readonly=True,
copy=False,
)
mtg_openai_gameplan = fields.Html(
string="Game Plan",
readonly=True,
copy=False,
)
mtg_openai_pilot_tips = fields.Html(
string="Pilot Tips",
readonly=True,
copy=False,
)
mtg_openai_risk_notes = fields.Html(
string="Risk Notes",
readonly=True,
copy=False,
)
mtg_openai_last_error = fields.Text(
string="Last Analysis Error",
readonly=True,
copy=False,
)
mtg_openai_last_prompt = fields.Text(
string="Last Analysis Prompt",
readonly=True,
copy=False,
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
)
mtg_openai_last_response = fields.Text(
string="Last Analysis Response",
readonly=True,
copy=False,
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
)
mtg_openai_role_analysis_state = fields.Selection(
selection=_MTG_OPENAI_STATE_SELECTION,
string="Role Analysis Status",
default="empty",
readonly=True,
copy=False,
)
mtg_openai_role_last_analyzed_at = fields.Datetime(
string="Last Role Analysis",
readonly=True,
copy=False,
)
mtg_openai_role_last_model = fields.Char(
string="Role Analysis Model",
readonly=True,
copy=False,
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
)
mtg_openai_role_last_lang = fields.Char(
string="Role Analysis Language",
readonly=True,
copy=False,
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
)
mtg_openai_role_last_error = fields.Text(
string="Last Role Analysis Error",
readonly=True,
copy=False,
)
mtg_openai_role_last_prompt = fields.Text(
string="Last Role Analysis Prompt",
readonly=True,
copy=False,
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
)
mtg_openai_role_last_response = fields.Text(
string="Last Role Analysis Response",
readonly=True,
copy=False,
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
)
mtg_openai_alternative_state = fields.Selection(
selection=_MTG_OPENAI_STATE_SELECTION,
string="Alternative Analysis Status",
default="empty",
readonly=True,
copy=False,
)
mtg_openai_alternative_last_analyzed_at = fields.Datetime(
string="Last Alternative Analysis",
readonly=True,
copy=False,
)
mtg_openai_alternative_last_model = fields.Char(
string="Alternative Analysis Model",
readonly=True,
copy=False,
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
)
mtg_openai_alternative_last_lang = fields.Char(
string="Alternative Analysis Language",
readonly=True,
copy=False,
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
)
mtg_openai_alternative_suggestions = fields.Html(
string="Alternative Suggestions",
readonly=True,
copy=False,
)
mtg_openai_alternative_last_error = fields.Text(
string="Last Alternative Analysis Error",
readonly=True,
copy=False,
)
mtg_openai_alternative_last_prompt = fields.Text(
string="Last Alternative Analysis Prompt",
readonly=True,
copy=False,
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
)
mtg_openai_alternative_last_response = fields.Text(
string="Last Alternative Analysis Response",
readonly=True,
copy=False,
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
)
mtg_openai_fill_state = fields.Selection(
selection=_MTG_OPENAI_STATE_SELECTION,
string="Deck Fill Status",
default="empty",
readonly=True,
copy=False,
)
mtg_openai_fill_last_analyzed_at = fields.Datetime(
string="Last Deck Fill",
readonly=True,
copy=False,
)
mtg_openai_fill_last_model = fields.Char(
string="Deck Fill Model",
readonly=True,
copy=False,
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
)
mtg_openai_fill_last_lang = fields.Char(
string="Deck Fill Language",
readonly=True,
copy=False,
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
)
mtg_openai_fill_summary = fields.Html(
string="Deck Fill Summary",
readonly=True,
copy=False,
)
mtg_openai_fill_last_error = fields.Text(
string="Last Deck Fill Error",
readonly=True,
copy=False,
)
mtg_openai_fill_last_prompt = fields.Text(
string="Last Deck Fill Prompt",
readonly=True,
copy=False,
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
)
mtg_openai_fill_last_response = fields.Text(
string="Last Deck Fill Response",
readonly=True,
copy=False,
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
)
def _mtg_openai_check_manager_access(self):
"""Require manager-level rights for OpenAI-triggering actions.
Returns:
bool: ``True`` when the current user may trigger OpenAI flows.
"""
if self.env.is_superuser() or any(
self.env.user.has_group(xmlid)
for xmlid in (
"mvd_tcg_base.mvd_tcg_base_group_manager",
"base.group_system",
)
):
return True
raise UserError(
_("Only TCG managers can run OpenAI deck analysis actions.")
)
@classmethod
def _mtg_openai_validate_base_url(cls, url):
"""Validate the configured OpenAI base URL.
Args:
url: Absolute OpenAI API base URL.
Returns:
str: Validated absolute base URL without a trailing slash.
Raises:
UserError: If the URL does not target the official HTTPS API host.
"""
normalized_url = (url or "").strip().rstrip("/")
parsed_url = parse.urlparse(normalized_url)
hostname = (parsed_url.hostname or "").lower()
if parsed_url.scheme != "https" or hostname not in cls._MTG_OPENAI_ALLOWED_BASE_HOSTS:
raise UserError(
_("OpenAI API Base URL must use HTTPS and the official api.openai.com host.")
)
return normalized_url
def action_mtg_openai_analyze_deck(self):
"""Analyze one MTG deck with OpenAI and persist the result.
The analysis language follows the current user language, while the
Oracle text payload remains pinned to ``en_US`` for rules accuracy.
"""
self.ensure_one()
self._mtg_openai_check_manager_access()
if not self.is_mtg_deck:
raise UserError(_("OpenAI deck analysis is only available for Magic decks."))
api_key = self._mtg_openai_get_api_key()
model_name = self._mtg_openai_get_model_name()
analysis_lang = self._mtg_openai_get_analysis_lang()
prompt = self._mtg_openai_build_prompt(analysis_lang)
try:
analysis_payload, raw_response = self._mtg_openai_request_analysis(
api_key=api_key,
model_name=model_name,
prompt=prompt,
)
except UserError as exc:
self.write(
{
"mtg_openai_analysis_state": "failed",
"mtg_openai_last_error": str(exc),
"mtg_openai_last_analyzed_at": fields.Datetime.now(),
"mtg_openai_last_model": model_name,
"mtg_openai_last_lang": analysis_lang,
"mtg_openai_last_prompt": prompt,
}
)
raise
self.write(
{
"mtg_openai_analysis_state": "done",
"mtg_openai_last_error": False,
"mtg_openai_last_analyzed_at": fields.Datetime.now(),
"mtg_openai_last_model": model_name,
"mtg_openai_last_lang": analysis_lang,
"mtg_openai_last_prompt": prompt,
"mtg_openai_last_response": raw_response,
"mtg_openai_commander_summary": self._mtg_openai_render_paragraphs(
analysis_payload.get("commander_summary")
),
"mtg_openai_gameplan": self._mtg_openai_render_bullets(
analysis_payload.get("gameplan_bullets")
),
"mtg_openai_pilot_tips": self._mtg_openai_render_bullets(
analysis_payload.get("pilot_tips")
),
"mtg_openai_risk_notes": self._mtg_openai_render_bullets(
analysis_payload.get("risk_notes")
),
}
)
return {
"type": "ir.actions.act_window",
"res_model": "mvd.tcg.deck",
"res_id": self.id,
"view_mode": "form",
"target": "current",
}
def action_mtg_openai_analyze_card_roles(self):
"""Analyze role tags for the cards of one MTG deck.
The analysis language follows the active user language, while
``oracle_text`` remains pinned to ``en_US`` for rules accuracy.
Returns:
dict: Window action that reloads the current deck form.
"""
self.ensure_one()
self._mtg_openai_check_manager_access()
if not self.is_mtg_deck:
raise UserError(_("OpenAI card analysis is only available for Magic decks."))
api_key = self._mtg_openai_get_api_key()
model_name = self._mtg_openai_get_model_name()
analysis_lang = self._mtg_openai_get_analysis_lang()
role_map = self._mtg_openai_get_role_key_map()
card_payload = self._mtg_openai_collect_role_payload()
if not card_payload:
raise UserError(_("This deck has no cards to analyze."))
batch_size = self._mtg_openai_get_role_batch_size()
prompt_batches = []
response_batches = []
pending_updates = []
try:
for batch_start in range(0, len(card_payload), batch_size):
batch_payload = card_payload[batch_start:batch_start + batch_size]
prompt = self._mtg_openai_build_role_prompt(
analysis_lang=analysis_lang,
role_map=role_map,
batch_payload=batch_payload,
)
prompt_batches.append(prompt)
role_payload, raw_response = self._mtg_openai_request_role_analysis(
api_key=api_key,
model_name=model_name,
prompt=prompt,
)
response_batches.append(raw_response)
pending_updates.extend(
self._mtg_openai_prepare_role_updates(
batch_payload=batch_payload,
role_payload=role_payload,
role_map=role_map,
)
)
except UserError as exc:
self.write(
{
"mtg_openai_role_analysis_state": "failed",
"mtg_openai_role_last_error": str(exc),
"mtg_openai_role_last_analyzed_at": fields.Datetime.now(),
"mtg_openai_role_last_model": model_name,
"mtg_openai_role_last_lang": analysis_lang,
"mtg_openai_role_last_prompt": "\n\n--- batch ---\n\n".join(
prompt_batches
),
"mtg_openai_role_last_response": "\n\n--- batch ---\n\n".join(
response_batches
) or False,
}
)
raise
for values in pending_updates:
line = self.env["mvd.tcg.deck.line"].browse(values["line_id"])
line.write(
{
"role_ids": [(6, 0, values["role_ids"])],
"mtg_openai_role_rationale": values["rationale"],
}
)
self.write(
{
"mtg_openai_role_analysis_state": "done",
"mtg_openai_role_last_error": False,
"mtg_openai_role_last_analyzed_at": fields.Datetime.now(),
"mtg_openai_role_last_model": model_name,
"mtg_openai_role_last_lang": analysis_lang,
"mtg_openai_role_last_prompt": "\n\n--- batch ---\n\n".join(
prompt_batches
),
"mtg_openai_role_last_response": "\n\n--- batch ---\n\n".join(
response_batches
),
}
)
return {
"type": "ir.actions.act_window",
"res_model": "mvd.tcg.deck",
"res_id": self.id,
"view_mode": "form",
"target": "current",
}
def action_mtg_openai_suggest_alternatives(self):
"""Suggest replacement cards for current MTG issue lines."""
self.ensure_one()
self._mtg_openai_check_manager_access()
if not self.is_mtg_deck:
raise UserError(
_("OpenAI alternative suggestions are only available for Magic decks.")
)
issue_payload = self._mtg_openai_collect_alternative_payload()
if not issue_payload:
raise UserError(_("This deck has no replaceable issue lines right now."))
api_key = self._mtg_openai_get_api_key()
model_name = self._mtg_openai_get_model_name()
analysis_lang = self._mtg_openai_get_analysis_lang()
prompt = self._mtg_openai_build_alternative_prompt(
analysis_lang=analysis_lang,
issue_payload=issue_payload,
)
try:
alternative_payload, raw_response = self._mtg_openai_request_alternative_analysis(
api_key=api_key,
model_name=model_name,
prompt=prompt,
)
prepared_suggestions = self._mtg_openai_prepare_alternative_suggestions(
issue_payload=issue_payload,
alternative_payload=alternative_payload,
)
except UserError as exc:
self.write(
{
"mtg_openai_alternative_state": "failed",
"mtg_openai_alternative_last_error": str(exc),
"mtg_openai_alternative_last_analyzed_at": fields.Datetime.now(),
"mtg_openai_alternative_last_model": model_name,
"mtg_openai_alternative_last_lang": analysis_lang,
"mtg_openai_alternative_last_prompt": prompt,
}
)
raise
self.write(
{
"mtg_openai_alternative_state": "done",
"mtg_openai_alternative_last_error": False,
"mtg_openai_alternative_last_analyzed_at": fields.Datetime.now(),
"mtg_openai_alternative_last_model": model_name,
"mtg_openai_alternative_last_lang": analysis_lang,
"mtg_openai_alternative_last_prompt": prompt,
"mtg_openai_alternative_last_response": raw_response,
"mtg_openai_alternative_suggestions": self._mtg_openai_render_alternative_suggestions(
prepared_suggestions
),
}
)
return {
"type": "ir.actions.act_window",
"res_model": "mvd.tcg.deck",
"res_id": self.id,
"view_mode": "form",
"target": "current",
}
def action_mtg_openai_fill_deck(self):
"""Fill missing mainboard slots from the in-system MTG card pool."""
self.ensure_one()
self._mtg_openai_check_manager_access()
if not self.is_mtg_deck:
raise UserError(_("OpenAI deck fill is only available for Magic decks."))
if not self.mtg_commander_card_id:
raise UserError(_("Select at least one commander card first."))
if not self.mtg_mainboard_board_id:
raise UserError(_("This deck does not have a mainboard yet."))
if not self.mtg_expected_mainboard_size:
raise UserError(_("Select a supported MTG format before using deck fill."))
remaining_slots = self.mtg_expected_mainboard_size - self.mtg_mainboard_count
if remaining_slots <= 0:
raise UserError(_("The mainboard is already full for the selected format."))
api_key = self._mtg_openai_get_api_key()
model_name = self._mtg_openai_get_model_name()
analysis_lang = self._mtg_openai_get_analysis_lang()
rendered_batches = []
prompt_batches = []
response_batches = []
total_added = 0
try:
while remaining_slots > 0:
candidate_payload = self._mtg_openai_collect_fill_candidate_payload(
limit=self._mtg_openai_get_fill_candidate_limit()
)
if not candidate_payload:
break
batch_size = min(
remaining_slots,
self._mtg_openai_get_fill_batch_size(),
len(candidate_payload),
)
prompt = self._mtg_openai_build_fill_prompt(
analysis_lang=analysis_lang,
candidate_payload=candidate_payload,
batch_size=batch_size,
)
prompt_batches.append(prompt)
fill_payload, raw_response = self._mtg_openai_request_fill_selection(
api_key=api_key,
model_name=model_name,
prompt=prompt,
)
response_batches.append(raw_response)
prepared_fill = self._mtg_openai_prepare_fill_selection(
candidate_payload=candidate_payload,
fill_payload=fill_payload,
max_cards=batch_size,
)
if not prepared_fill["cards"]:
break
rendered_batches.append(prepared_fill)
for suggestion in prepared_fill["cards"]:
card = self.env["mvd.tcg.card"].browse(suggestion["card_id"]).exists()
if not card:
continue
self._mtg_openai_add_card_to_board(
card=card,
board=self.mtg_mainboard_board_id,
quantity=suggestion["quantity"],
)
total_added += suggestion["quantity"]
self.invalidate_recordset(
[
"mtg_mainboard_count",
"mtg_rule_warning_count",
"mtg_duplicate_card_count",
]
)
remaining_slots = self.mtg_expected_mainboard_size - self.mtg_mainboard_count
except UserError as exc:
self.write(
{
"mtg_openai_fill_state": "failed",
"mtg_openai_fill_last_error": str(exc),
"mtg_openai_fill_last_analyzed_at": fields.Datetime.now(),
"mtg_openai_fill_last_model": model_name,
"mtg_openai_fill_last_lang": analysis_lang,
"mtg_openai_fill_last_prompt": "\n\n--- batch ---\n\n".join(
prompt_batches
),
"mtg_openai_fill_last_response": "\n\n--- batch ---\n\n".join(
response_batches
) or False,
}
)
raise
summary_markup = self._mtg_openai_render_fill_summary(
fill_batches=rendered_batches,
total_added=total_added,
remaining_slots=max(
0,
self.mtg_expected_mainboard_size - self.mtg_mainboard_count,
),
)
self.write(
{
"mtg_openai_fill_state": "done",
"mtg_openai_fill_last_error": False,
"mtg_openai_fill_last_analyzed_at": fields.Datetime.now(),
"mtg_openai_fill_last_model": model_name,
"mtg_openai_fill_last_lang": analysis_lang,
"mtg_openai_fill_last_prompt": "\n\n--- batch ---\n\n".join(
prompt_batches
),
"mtg_openai_fill_last_response": "\n\n--- batch ---\n\n".join(
response_batches
) or False,
"mtg_openai_fill_summary": summary_markup,
}
)
return {
"type": "ir.actions.act_window",
"res_model": "mvd.tcg.deck",
"res_id": self.id,
"view_mode": "form",
"target": "current",
}
def _mtg_openai_get_api_key(self):
"""Return the configured OpenAI API key.
Returns:
str: The configured OpenAI API key.
"""
config_key = "mvd_tcg_mtg_deck_openai.api_key"
api_key = (self._mtg_openai_get_config_parameter(config_key) or "").strip()
if not api_key:
api_key = (os.getenv("OPENAI_API_KEY") or "").strip()
if not api_key:
raise UserError(_("Configure an OpenAI API key in TCG Settings first."))
return api_key
def _mtg_openai_get_config_parameter(self, name, default=False):
"""Return one OpenAI configuration parameter.
Args:
name: Technical config parameter key.
default: Fallback value when the parameter is unset.
Returns:
str | bool: Stored config value or the provided fallback.
"""
return self.env["ir.config_parameter"].sudo().get_param(name, default)
def _mtg_openai_get_positive_integer_setting(self, name, default):
"""Return one positive integer connector setting.
Args:
name: Technical config parameter key.
default: Fallback integer when the parameter is unset or invalid.
Returns:
int: Positive integer configuration value.
"""
raw_value = self._mtg_openai_get_config_parameter(name, default)
try:
return max(1, int(raw_value))
except (TypeError, ValueError):
return default
def _mtg_openai_get_model_name(self):
"""Return the configured OpenAI model name.
Returns:
str: The OpenAI model identifier.
"""
config_key = "mvd_tcg_mtg_deck_openai.model_name"
model_name = (
self._mtg_openai_get_config_parameter(
config_key,
DEFAULT_MTG_OPENAI_MODEL_NAME,
)
or DEFAULT_MTG_OPENAI_MODEL_NAME
).strip()
if model_name == DEFAULT_MTG_OPENAI_MODEL_NAME:
env_model_name = (os.getenv("OPENAI_MODEL") or "").strip()
if env_model_name:
return env_model_name
return model_name
def _mtg_openai_get_api_base_url(self):
"""Return the configured OpenAI API base URL.
Returns:
str: Base URL for Responses API requests.
"""
return self._mtg_openai_validate_base_url(
(
self._mtg_openai_get_config_parameter(
"mvd_tcg_mtg_deck_openai.api_base_url",
DEFAULT_MTG_OPENAI_API_BASE_URL,
)
or DEFAULT_MTG_OPENAI_API_BASE_URL
)
)
def _mtg_openai_get_request_timeout_seconds(self):
"""Return the configured OpenAI request timeout."""
return self._mtg_openai_get_positive_integer_setting(
"mvd_tcg_mtg_deck_openai.request_timeout_seconds",
DEFAULT_MTG_OPENAI_REQUEST_TIMEOUT_SECONDS,
)
def _mtg_openai_get_role_batch_size(self):
"""Return the configured role-analysis batch size."""
return self._mtg_openai_get_positive_integer_setting(
"mvd_tcg_mtg_deck_openai.role_batch_size",
self._MTG_OPENAI_ROLE_BATCH_SIZE,
)
def _mtg_openai_get_fill_candidate_limit(self):
"""Return the configured fill candidate pool size."""
return self._mtg_openai_get_positive_integer_setting(
"mvd_tcg_mtg_deck_openai.fill_candidate_limit",
self._MTG_OPENAI_FILL_CANDIDATE_LIMIT,
)
def _mtg_openai_get_fill_batch_size(self):
"""Return the configured fill selection batch size."""
return self._mtg_openai_get_positive_integer_setting(
"mvd_tcg_mtg_deck_openai.fill_batch_size",
self._MTG_OPENAI_FILL_BATCH_SIZE,
)
def _mtg_openai_get_alternative_candidate_limit(self):
"""Return the configured alternative suggestion pool size."""
return self._mtg_openai_get_positive_integer_setting(
"mvd_tcg_mtg_deck_openai.alternative_candidate_limit",
self._MTG_OPENAI_ALTERNATIVE_CANDIDATE_LIMIT,
)
def _mtg_openai_get_analysis_lang(self):
"""Return the active analysis language code.
Returns:
str: Odoo language code from context or user preferences.
"""
return (
self.env.context.get("lang")
or self.env.user.lang
or "en_US"
)
def _mtg_openai_get_analysis_language_name(self, lang_code):
"""Resolve a human-readable language name for one Odoo language code.
Args:
lang_code: Odoo language code such as ``de_DE``.
Returns:
str: Display name for prompting, for example ``German``.
"""
language = self.env["res.lang"].sudo().search([("code", "=", lang_code)], limit=1)
return language.name or lang_code or "English"
@classmethod
def _mtg_openai_join_batches(cls, items):
"""Join batched prompts or responses with one stable separator.
Args:
items: Ordered prompt or response chunks.
Returns:
str | bool: Joined payload text, or ``False`` when empty.
"""
cleaned_items = [item for item in items if item]
if not cleaned_items:
return False
return cls._MTG_OPENAI_BATCH_SEPARATOR.join(cleaned_items)
def _mtg_openai_write_run_state(
self,
run_key,
*,
state,
model_name,
analysis_lang,
prompt=False,
response=False,
error_message=False,
extra_values=None,
):
"""Persist one OpenAI run state update on the current deck.
Args:
run_key: Stable OpenAI run key such as ``analysis`` or ``fill``.
state: Target state value.
model_name: OpenAI model identifier used for the run.
analysis_lang: Active Odoo analysis language code.
prompt: Serialized prompt text or batch text.
response: Raw response text or joined batch responses.
error_message: Optional failure message.
extra_values: Optional additional field values to persist.
"""
self.ensure_one()
field_map = self._MTG_OPENAI_RUN_FIELD_MAP[run_key]
values = {
field_map["state"]: state,
field_map["error"]: error_message or False,
field_map["analyzed_at"]: fields.Datetime.now(),
field_map["model"]: model_name,
field_map["lang"]: analysis_lang,
field_map["prompt"]: prompt or False,
field_map["response"]: response or False,
}
if extra_values:
values.update(extra_values)
self.write(values)
@staticmethod
def _mtg_openai_build_faces_payload(card):
"""Serialize all explicit MTG card faces for AI prompts.
Args:
card: MTG card record in a stable language context.
Returns:
list[dict[str, object]]: Ordered face payloads.
"""
return [
{
"name": section["name"],
"mana_cost": section["mana_cost"],
"type_line": section["type_line"],
"oracle_text": section["oracle_text"],
}
for section in card.mtg_get_rules_sections()
]
def _mtg_openai_build_rules_payload(self, card):
"""Return one reusable rules payload for prompts.
Args:
card: MTG card record, in any language context.
Returns:
dict[str, object]: Face-aware rules payload built from English text.
"""
self.ensure_one()
english_card = card.with_context(lang="en_US")
return {
"oracle_text": english_card.mtg_get_rules_summary(),
"face_count": english_card.mtg_face_count,
"faces": self._mtg_openai_build_faces_payload(english_card),
}
def _mtg_openai_build_card_reference_payload(self, card):
"""Return one reusable English card payload for AI prompts."""
self.ensure_one()
english_card = card.with_context(lang="en_US")
return {
"name": english_card.display_name,
"mana_cost": english_card.mtg_mana_cost or False,
"mana_value": english_card.mtg_mana_value or 0.0,
"type_line": english_card.mtg_type_line or False,
"set": english_card.mtg_set_id.display_name if english_card.mtg_set_id else False,
"collector_number": english_card.mtg_collector_number or False,
**self._mtg_openai_build_rules_payload(english_card),
}
def _mtg_openai_build_prompt(self, analysis_lang):
"""Build the full analysis prompt for one MTG deck.
Args:
analysis_lang: Target output language code.
Returns:
str: Final prompt sent to the OpenAI Responses API.
"""
self.ensure_one()
analysis_language_name = self._mtg_openai_get_analysis_language_name(
analysis_lang
)
deck_payload = {
**self._mtg_openai_get_deck_identity_payload(),
"metrics": self._mtg_openai_get_metric_payload(),
"boards": self._mtg_openai_collect_board_payload(),
}
return (
"Analyze this Magic: The Gathering deck as an experienced Commander deck builder. "
"Explain how the deck functions from a pilot perspective. Focus on the actual card pool, "
"the commander, the mana curve, synergy packages, likely play patterns, and how the deck "
"wins or stabilizes. The first paragraph should work as a concise short description of the deck. "
"Do not invent cards or text. If something is uncertain, say so briefly. "
f"Write every value in the JSON response in {analysis_language_name}. "
"The provided oracle_text values are intentionally in English and must be treated as the authoritative rules text input. "
"Multi-face cards may also include a faces array with the authoritative English rules text for each printed face. "
"Treat those cards as one deck slot that can offer multiple spell options.\n\n"
"Return JSON with exactly this shape:\n"
"{\n"
' "commander_summary": "2-3 short paragraphs in plain text that explain how the deck works. The first paragraph must read like a compact deck blurb.",\n'
' "gameplan_bullets": ["3-6 concise bullets"],\n'
' "pilot_tips": ["3-6 concise bullets"],\n'
' "risk_notes": ["2-5 concise bullets"]\n'
"}\n\n"
"Deck data:\n"
f"{json.dumps(deck_payload, ensure_ascii=False, indent=2)}"
)
def _mtg_openai_build_role_prompt(self, analysis_lang, role_map, batch_payload):
"""Build one prompt that classifies role tags for deck lines.
Args:
analysis_lang: Target output language code.
role_map: Mapping from stable role keys to deck-role records.
batch_payload: Deck-line batch to classify.
Returns:
str: Final prompt for one role-analysis batch.
"""
analysis_language_name = self._mtg_openai_get_analysis_language_name(
analysis_lang
)
role_payload = [
{
"key": role_key,
"name": role_record.display_name,
"note": role_record.note or False,
}
for role_key, role_record in role_map.items()
]
return (
"Analyze these Magic: The Gathering Commander deck cards and assign deckbuilding roles. "
"Use only the allowed role keys. A card may have zero, one, or multiple roles. "
"Prefer precise role tags over broad tagging. Do not invent cards or rules text. "
f"Write every rationale in {analysis_language_name}. "
"The provided oracle_text values are intentionally in English and must be treated as the authoritative rules text input. "
"Multi-face cards may also include a faces array with the authoritative English rules text for each printed face. "
"Treat those cards as one deck slot that can offer multiple spell options.\n\n"
"Return JSON with exactly this shape:\n"
"{\n"
' "cards": [\n'
" {\n"
' "line_id": 123,\n'
' "role_keys": ["ramp", "value"],\n'
' "rationale": "One short sentence."\n'
" }\n"
" ]\n"
"}\n\n"
"Every input line_id must appear exactly once in the cards array.\n\n"
"Allowed roles:\n"
f"{json.dumps(role_payload, ensure_ascii=False, indent=2)}\n\n"
"Cards to classify:\n"
f"{json.dumps(batch_payload, ensure_ascii=False, indent=2)}"
)
def _mtg_openai_build_alternative_prompt(self, analysis_lang, issue_payload):
"""Build the prompt that suggests replacements for problematic lines."""
self.ensure_one()
analysis_language_name = self._mtg_openai_get_analysis_language_name(
analysis_lang
)
deck_payload = {
**self._mtg_openai_get_deck_identity_payload(),
"issues": issue_payload,
}
return (
"You are helping to repair a Magic: The Gathering Commander deck. "
"For each problematic deck line, suggest 2 to 3 replacement cards from the provided in-system candidate pool. "
"Keep the deck inside the active commander color identity and avoid singleton conflicts. "
"Prefer candidates that preserve the card's likely job in the deck, such as ramp, draw, removal, or threat density. "
"Only use candidate card_ids listed under each issue line. "
f"Write every explanation in {analysis_language_name}. "
"The provided oracle_text values are intentionally in English and must be treated as the authoritative rules text input. "
"Multi-face cards may include a faces array and should still be treated as one card slot.\n\n"
"Return JSON with exactly this shape:\n"
"{\n"
' "alternatives": [\n'
" {\n"
' "line_id": 123,\n'
' "summary": "One short sentence.",\n'
' "suggestions": [\n'
" {\n"
' "card_id": 456,\n'
' "reason": "One short sentence."\n'
" }\n"
" ]\n"
" }\n"
" ]\n"
"}\n\n"
"Every input line_id must appear exactly once. If no useful replacement exists for a line, return an empty suggestions list for that line.\n\n"
"Deck repair payload:\n"
f"{json.dumps(deck_payload, ensure_ascii=False, indent=2)}"
)
def _mtg_openai_build_fill_prompt(self, analysis_lang, candidate_payload, batch_size):
"""Build the prompt that fills missing mainboard slots."""
self.ensure_one()
analysis_language_name = self._mtg_openai_get_analysis_language_name(
analysis_lang
)
deck_payload = {
**self._mtg_openai_get_deck_identity_payload(),
"current_metrics": {
**self._mtg_openai_get_metric_payload(),
"tagged_cards": self.mtg_tagged_line_count,
"untagged_cards": self.mtg_untagged_line_count,
"role_coverage_ratio": self.mtg_role_coverage_ratio,
"remaining_mainboard_slots": self.mtg_expected_mainboard_size - self.mtg_mainboard_count,
},
"candidates": candidate_payload,
}
return (
"You are finishing a Magic: The Gathering Commander deck from an in-system card pool. "
"Select the strongest additions for the next batch of missing mainboard slots. "
"Balance mana development, card flow, interaction, threats, and commander synergy. "
"Use only candidate card_ids from the provided pool. "
"Do not invent cards. "
"Use quantity values greater than 1 only when allows_multiple_copies is true. "
f"Write every explanation in {analysis_language_name}. "
"The provided oracle_text values are intentionally in English and must be treated as the authoritative rules text input. "
"Multi-face cards may include a faces array and should still be treated as one card slot.\n\n"
"Return JSON with exactly this shape:\n"
"{\n"
' "summary": "One short paragraph.",\n'
' "cards": [\n'
" {\n"
' "card_id": 456,\n'
' "quantity": 1,\n'
' "reason": "One short sentence."\n'
" }\n"
" ]\n"
"}\n\n"
f"Select at most {batch_size} total cards in this batch.\n\n"
"Deck fill payload:\n"
f"{json.dumps(deck_payload, ensure_ascii=False, indent=2)}"
)
def _mtg_openai_get_deck_identity_payload(self):
"""Return the stable deck identity payload reused across prompts.
Returns:
dict[str, object]: Deck name, format, commander, and color identity.
"""
self.ensure_one()
return {
"deck_name": self.name,
"format": self.mtg_format_id.display_name or False,
"commander": self.mtg_commander_card_id.display_name or False,
"color_identity": {
"name": self.mtg_color_identity_name or False,
"signature": self.mtg_color_identity_signature or False,
},
}
def _mtg_openai_get_metric_payload(self):
"""Return the stable MTG metric payload reused across prompts.
Returns:
dict[str, object]: Current deck metric snapshot.
"""
self.ensure_one()
return {
"mainboard_count": self.mtg_mainboard_count,
"command_zone_count": self.mtg_command_zone_count,
"sideboard_count": self.mtg_sideboard_count,
"maybeboard_count": self.mtg_maybeboard_count,
"average_mana_value": self.mtg_average_mana_value,
"land_count": self.mtg_land_count,
"creature_count": self.mtg_creature_count,
"artifact_count": self.mtg_artifact_count,
"enchantment_count": self.mtg_enchantment_count,
"planeswalker_count": self.mtg_planeswalker_count,
"instant_count": self.mtg_instant_count,
"sorcery_count": self.mtg_sorcery_count,
}
def _mtg_openai_collect_board_payload(self):
"""Collect a structured board snapshot for the OpenAI prompt.
Oracle text is read explicitly in ``en_US`` to keep the rules input
stable even when the active user works in another language.
Returns:
list[dict[str, object]]: Board and card payloads for the prompt.
"""
self.ensure_one()
board_payload = []
for board in self.board_ids.sorted(
lambda current_board: (current_board.sequence, current_board.id)
):
if not board.include_in_total and board.code != "command_zone":
continue
card_payloads = []
for line in board.line_ids.sorted(
lambda current_line: (
current_line.board_sequence,
getattr(current_line, "mtg_mana_value", 0.0),
current_line.card_id.display_name or "",
current_line.id,
)
):
card_payload = self._mtg_openai_build_card_reference_payload(line.card_id)
if (
card_payload["face_count"] <= 1
and card_payload["type_line"]
and "Land" in card_payload["type_line"]
):
card_payload["oracle_text"] = False
card_payloads.append(
{
"quantity": line.quantity,
"roles": line.role_ids.sorted(
key=lambda role: (role.sequence, role.name or "", role.id)
).mapped("name"),
**card_payload,
}
)
board_payload.append(
{
"name": board.name,
"code": board.code,
"include_in_total": board.include_in_total,
"cards": card_payloads,
}
)
return board_payload
def _mtg_openai_collect_role_payload(self):
"""Collect deck-line payloads for role classification.
Returns:
list[dict[str, object]]: Structured deck-line payloads.
"""
self.ensure_one()
payload = []
for line in self.line_ids.sorted(
lambda current_line: (
current_line.board_sequence,
getattr(current_line, "mtg_mana_value", 0.0),
current_line.card_id.display_name or "",
current_line.id,
)
):
payload.append(
{
"line_id": line.id,
"board": line.board_id.code or line.board_id.name,
"quantity": line.quantity,
**self._mtg_openai_build_card_reference_payload(line.card_id),
}
)
return payload
def _mtg_openai_collect_alternative_payload(self):
"""Collect issue lines plus per-line replacement candidates."""
self.ensure_one()
issue_payload = []
issue_lines = self.line_ids.filtered(
lambda line: line.board_id.include_in_total and line.mtg_issue_count
).sorted(
lambda current_line: (
current_line.board_sequence,
current_line.primary_role_sequence,
current_line.mtg_mana_value,
current_line.card_id.display_name or "",
current_line.id,
)
)
for line in issue_lines:
candidates = self._mtg_openai_get_alternative_candidate_cards(
issue_line=line,
limit=self._mtg_openai_get_alternative_candidate_limit(),
)
if not candidates:
continue
issue_payload.append(
{
"line_id": line.id,
"card": self._mtg_openai_build_line_payload(line),
"issues": self._mtg_openai_get_issue_labels(line),
"candidates": [
self._mtg_openai_build_card_payload(card)
for card in candidates
],
}
)
return issue_payload
def _mtg_openai_collect_fill_candidate_payload(self, limit):
"""Collect a balanced candidate pool for AI-driven deck fill."""
self.ensure_one()
candidates = self._mtg_openai_get_fill_candidate_cards(limit=limit)
return [self._mtg_openai_build_card_payload(card) for card in candidates]
def _mtg_openai_get_issue_labels(self, line):
"""Return stable issue labels for one MTG deck line."""
issue_labels = []
if line.mtg_singleton_violation:
issue_labels.append("singleton")
if line.mtg_color_identity_violation:
issue_labels.append("color_identity")
if not line.mtg_legality_ok:
issue_labels.append(line.mtg_legality_status or "illegal")
return issue_labels or ["issue"]
def _mtg_openai_build_line_payload(self, line):
"""Serialize one deck line for OpenAI prompts."""
return {
"line_id": line.id,
"board": line.board_id.code or line.board_id.name,
"quantity": line.quantity,
"roles": line.role_ids.sorted(
key=lambda role: (role.sequence, role.name or "", role.id)
).mapped("name"),
**self._mtg_openai_build_card_reference_payload(line.card_id),
}
def _mtg_openai_build_card_payload(self, card):
"""Serialize one MTG card candidate for OpenAI prompts."""
english_card = card.with_context(lang="en_US")
primary_types = english_card.mtg_card_type_ids.sorted(
key=lambda card_type: (card_type.sequence, card_type.name or "", card_type.id)
).mapped("name")
legality_status = self.env["mvd.tcg.deck.line"]._mtg_get_card_legality_status(
english_card,
self.mtg_format_id.code or "",
)
return {
"card_id": english_card.id,
"color_identity": {
"signature": english_card.mtg_color_identity_signature or False,
"colors": english_card.mtg_color_identity_ids.sorted(
key=lambda color: (color.sequence, color.code or "", color.id)
).mapped("name"),
},
**self._mtg_openai_build_card_reference_payload(english_card),
"rarity": english_card.mtg_rarity_id.display_name if english_card.mtg_rarity_id else False,
"types": primary_types,
"allows_multiple_copies": english_card.mtg_allows_unlimited_copies(),
"current_quantity": self._mtg_openai_get_current_mainboard_quantity(english_card),
"legality_status": legality_status,
}
def _mtg_openai_get_current_mainboard_quantity(self, card):
"""Return the current mainboard quantity for one card record."""
self.ensure_one()
mainboard_line = self.mtg_mainboard_line_ids.filtered(
lambda line: line.card_id == card
)[:1]
return mainboard_line.quantity if mainboard_line else 0
def _mtg_openai_get_existing_singleton_aliases(self):
"""Return singleton aliases currently present in included MTG boards."""
self.ensure_one()
aliases = set()
for card in self.line_ids.filtered(
lambda line: line.board_id.include_in_total and line.card_id.game_id.code == "mtg"
).mapped("card_id").with_context(lang="en_US"):
aliases.update(card.mtg_get_singleton_key_aliases())
return aliases
def _mtg_openai_identity_allows_card(self, card):
"""Return whether one card fits the current commander identity."""
self.ensure_one()
commander_identity = set(self.mtg_color_identity_signature or "")
if not commander_identity:
return True
return not (set(card.mtg_color_identity_signature or "") - commander_identity)
def _mtg_openai_is_legal_candidate(self, card):
"""Return whether one card can be considered for the active format."""
legality_status = self.env["mvd.tcg.deck.line"]._mtg_get_card_legality_status(
card,
self.mtg_format_id.code or "",
)
return legality_status not in {"banned", "not_legal"}
def _mtg_openai_get_base_fill_candidates(self):
"""Return the coarse MTG candidate pool for deck fill."""
self.ensure_one()
card_model = self.env["mvd.tcg.card"].with_context(lang="en_US")
existing_aliases = self._mtg_openai_get_existing_singleton_aliases()
candidates = card_model.browse()
for card in card_model.search(
[
("game_id.code", "=", "mtg"),
("active", "=", True),
("mtg_is_token", "=", False),
("mtg_is_digital", "=", False),
],
order="mtg_collector_sort_key, id",
):
if not self._mtg_openai_identity_allows_card(card):
continue
if not self._mtg_openai_is_legal_candidate(card):
continue
if not card.mtg_allows_unlimited_copies() and (
existing_aliases & set(card.mtg_get_singleton_key_aliases())
):
continue
candidates |= card
return candidates
def _mtg_openai_get_fill_candidate_cards(self, limit):
"""Return a balanced, size-limited MTG candidate pool for deck fill."""
self.ensure_one()
type_priority = (
"land",
"creature",
"artifact",
"enchantment",
"instant",
"sorcery",
"planeswalker",
"other",
)
grouped_candidates = {type_code: [] for type_code in type_priority}
for card in self._mtg_openai_get_base_fill_candidates():
type_codes = set(card.mtg_card_type_ids.mapped("code"))
if "land" in type_codes:
grouped_candidates["land"].append(card.id)
elif "creature" in type_codes:
grouped_candidates["creature"].append(card.id)
elif "artifact" in type_codes:
grouped_candidates["artifact"].append(card.id)
elif "enchantment" in type_codes:
grouped_candidates["enchantment"].append(card.id)
elif "instant" in type_codes:
grouped_candidates["instant"].append(card.id)
elif "sorcery" in type_codes:
grouped_candidates["sorcery"].append(card.id)
elif "planeswalker" in type_codes:
grouped_candidates["planeswalker"].append(card.id)
else:
grouped_candidates["other"].append(card.id)
ordered_ids = []
indices = {type_code: 0 for type_code in type_priority}
while len(ordered_ids) < limit:
did_append = False
for type_code in type_priority:
bucket = grouped_candidates[type_code]
bucket_index = indices[type_code]
if bucket_index >= len(bucket):
continue
ordered_ids.append(bucket[bucket_index])
indices[type_code] += 1
did_append = True
if len(ordered_ids) >= limit:
break
if not did_append:
break
return self.env["mvd.tcg.card"].browse(ordered_ids)
def _mtg_openai_get_alternative_candidate_cards(self, issue_line, limit):
"""Return a narrow replacement pool for one problematic line."""
self.ensure_one()
target_card = issue_line.card_id.with_context(lang="en_US")
target_types = set(target_card.mtg_card_type_ids.mapped("code"))
target_mana_value = target_card.mtg_mana_value or 0.0
existing_aliases = self._mtg_openai_get_existing_singleton_aliases()
candidates = self.env["mvd.tcg.card"].browse()
fallback_candidates = self.env["mvd.tcg.card"].browse()
for card in self._mtg_openai_get_base_fill_candidates():
if card == target_card:
continue
if existing_aliases & set(card.mtg_get_singleton_key_aliases()):
continue
card_types = set(card.mtg_card_type_ids.mapped("code"))
mana_gap = abs((card.mtg_mana_value or 0.0) - target_mana_value)
if target_types and card_types and target_types & card_types:
if mana_gap <= 3.0:
candidates |= card
else:
fallback_candidates |= card
else:
fallback_candidates |= card
ordered_candidates = (candidates | fallback_candidates).sorted(
key=lambda card: (
0
if target_types & set(card.mtg_card_type_ids.mapped("code"))
else 1,
abs((card.mtg_mana_value or 0.0) - target_mana_value),
card.mtg_mana_value or 0.0,
card.display_name or "",
card.id,
)
)
return ordered_candidates[:limit]
def _mtg_openai_add_card_to_board(self, card, board, quantity=1):
"""Create or increment one deck line in a validated way."""
self.ensure_one()
line_model = self.env["mvd.tcg.deck.line"]
existing_line = line_model.search(
[
("board_id", "=", board.id),
("card_id", "=", card.id),
],
limit=1,
)
self._mvd_tcg_validate_add_to_board(
card,
board,
quantity=quantity,
existing_line=existing_line,
)
if existing_line:
existing_line.write({"quantity": existing_line.quantity + quantity})
return existing_line
return line_model.create(
{
"board_id": board.id,
"quantity": quantity,
"card_id": card.id,
}
)
def _mtg_openai_get_role_key_map(self):
"""Return the supported deck-role records keyed by stable AI keys.
Returns:
dict[str, Model]: Stable role-key mapping.
"""
role_xmlids = {
"ramp": "mvd_tcg_deck.mvd_tcg_deck_role_ramp",
"draw": "mvd_tcg_deck.mvd_tcg_deck_role_draw",
"removal": "mvd_tcg_deck.mvd_tcg_deck_role_removal",
"interaction": "mvd_tcg_deck.mvd_tcg_deck_role_interaction",
"protection": "mvd_tcg_deck.mvd_tcg_deck_role_protection",
"wincon": "mvd_tcg_deck.mvd_tcg_deck_role_wincon",
"value": "mvd_tcg_deck.mvd_tcg_deck_role_value",
"combo": "mvd_tcg_deck.mvd_tcg_deck_role_combo",
}
role_keys = list(role_xmlids)
role_model = self.env["mvd.tcg.deck.role"]
roles = role_model.search([("technical_key", "in", role_keys)])
role_map = {role.technical_key: role for role in roles}
missing_role_keys = [role_key for role_key in role_keys if role_key not in role_map]
if missing_role_keys:
for role_key in list(missing_role_keys):
fallback_role = self.env.ref(role_xmlids[role_key], raise_if_not_found=False)
if not fallback_role:
continue
role_map[role_key] = fallback_role
missing_role_keys = [
role_key for role_key in role_keys if role_key not in role_map
]
if missing_role_keys:
raise UserError(
_(
"Missing configured deck roles for: %s"
)
% ", ".join(missing_role_keys)
)
return role_map
def _mtg_openai_prepare_alternative_suggestions(self, issue_payload, alternative_payload):
"""Validate and normalize alternative suggestions returned by OpenAI."""
expected_ids = {item["line_id"] for item in issue_payload}
candidate_ids_by_line = {
item["line_id"]: {candidate["card_id"] for candidate in item["candidates"]}
for item in issue_payload
}
issue_map = {item["line_id"]: item for item in issue_payload}
prepared = []
seen_ids = set()
for item in alternative_payload.get("alternatives") or []:
line_id = item.get("line_id")
if line_id not in expected_ids:
raise UserError(
_("OpenAI returned alternatives for an unknown deck line.")
)
if line_id in seen_ids:
raise UserError(
_("OpenAI returned duplicate alternative rows for one deck line.")
)
seen_ids.add(line_id)
allowed_candidate_ids = candidate_ids_by_line[line_id]
suggestions = []
seen_candidate_ids = set()
for suggestion in item.get("suggestions") or []:
card_id = suggestion.get("card_id")
if card_id not in allowed_candidate_ids:
raise UserError(
_("OpenAI returned an unsupported replacement candidate.")
)
if card_id in seen_candidate_ids:
continue
seen_candidate_ids.add(card_id)
candidate = self.env["mvd.tcg.card"].browse(card_id).exists()
if not candidate:
continue
suggestions.append(
{
"card_id": candidate.id,
"name": candidate.display_name,
"mana_cost": candidate.mtg_mana_cost or False,
"type_line": candidate.mtg_type_line or False,
"reason": (suggestion.get("reason") or "").strip() or False,
}
)
prepared.append(
{
"line_id": line_id,
"card_name": issue_map[line_id]["card"]["name"],
"issues": issue_map[line_id]["issues"],
"summary": (item.get("summary") or "").strip() or False,
"suggestions": suggestions,
}
)
if seen_ids != expected_ids:
raise UserError(
_("OpenAI did not return alternative rows for every issue line.")
)
return prepared
def _mtg_openai_prepare_fill_selection(self, candidate_payload, fill_payload, max_cards):
"""Validate and normalize one batch of fill-card selections."""
allowed_candidates = {
item["card_id"]: item for item in candidate_payload
}
selected_cards = []
seen_card_ids = set()
total_quantity = 0
for item in fill_payload.get("cards") or []:
card_id = item.get("card_id")
candidate = allowed_candidates.get(card_id)
if not candidate:
raise UserError(_("OpenAI returned a card outside the allowed pool."))
if card_id in seen_card_ids:
raise UserError(_("OpenAI returned the same fill card more than once."))
seen_card_ids.add(card_id)
quantity = int(item.get("quantity") or 1)
if quantity <= 0:
raise UserError(_("OpenAI returned an invalid fill quantity."))
if quantity > 1 and not candidate["allows_multiple_copies"]:
raise UserError(
_("OpenAI attempted to add multiple copies of a singleton card.")
)
total_quantity += quantity
if total_quantity > max_cards:
raise UserError(_("OpenAI attempted to add too many cards in one batch."))
selected_cards.append(
{
"card_id": card_id,
"quantity": quantity,
"name": candidate["name"],
"mana_cost": candidate["mana_cost"],
"type_line": candidate["type_line"],
"reason": (item.get("reason") or "").strip() or False,
}
)
return {
"summary": (fill_payload.get("summary") or "").strip() or False,
"cards": selected_cards,
}
def _mtg_openai_prepare_role_updates(self, batch_payload, role_payload, role_map):
"""Validate one role-analysis batch and convert it into line writes.
Args:
batch_payload: Input card payloads for one batch.
role_payload: Parsed JSON payload returned by OpenAI.
role_map: Supported role records keyed by stable AI keys.
Returns:
list[dict[str, object]]: Line updates ready for ORM writes.
"""
expected_ids = {item["line_id"] for item in batch_payload}
cards = role_payload.get("cards") or []
seen_ids = set()
updates = []
for item in cards:
line_id = item.get("line_id")
if line_id not in expected_ids:
raise UserError(
_("OpenAI returned a role analysis for an unknown deck line.")
)
if line_id in seen_ids:
raise UserError(
_("OpenAI returned duplicate role analysis rows for one deck line.")
)
role_ids = []
for role_key in item.get("role_keys") or []:
role_record = role_map.get(role_key)
if not role_record:
raise UserError(
_(
"OpenAI returned an unsupported role key: %(role_key)s",
role_key=role_key,
)
)
role_ids.append(role_record.id)
seen_ids.add(line_id)
updates.append(
{
"line_id": line_id,
"role_ids": role_ids,
"rationale": (item.get("rationale") or "").strip() or False,
}
)
if seen_ids != expected_ids:
raise UserError(
_("OpenAI did not return role assignments for every deck line.")
)
return updates
def _mtg_openai_request_analysis(self, api_key, model_name, prompt):
"""Call the OpenAI Responses API and parse the JSON analysis payload.
Args:
api_key: OpenAI API key.
model_name: OpenAI model identifier.
prompt: Final analysis prompt.
Returns:
tuple[dict[str, object], str]: Parsed analysis JSON and raw response.
"""
schema = {
"type": "object",
"additionalProperties": False,
"properties": {
"commander_summary": {"type": "string"},
"gameplan_bullets": {
"type": "array",
"items": {"type": "string"},
},
"pilot_tips": {
"type": "array",
"items": {"type": "string"},
},
"risk_notes": {
"type": "array",
"items": {"type": "string"},
},
},
"required": [
"commander_summary",
"gameplan_bullets",
"pilot_tips",
"risk_notes",
],
}
return self._mtg_openai_request_structured_payload(
api_key=api_key,
model_name=model_name,
prompt=prompt,
system_instruction=(
"You are an expert Magic: The Gathering Commander analyst. "
"You explain how decks function, how they sequence, and where "
"their strengths and risks lie."
),
response_name="mvd_tcg_mtg_commander_analysis",
schema=schema,
max_output_tokens=3200,
)
def _mtg_openai_request_role_analysis(self, api_key, model_name, prompt):
"""Call the OpenAI Responses API for card-role classification.
Args:
api_key: OpenAI API key.
model_name: OpenAI model identifier.
prompt: Final role-analysis prompt.
Returns:
tuple[dict[str, object], str]: Parsed role JSON and raw response.
"""
schema = {
"type": "object",
"additionalProperties": False,
"properties": {
"cards": {
"type": "array",
"items": {
"type": "object",
"additionalProperties": False,
"properties": {
"line_id": {"type": "integer"},
"role_keys": {
"type": "array",
"items": {
"type": "string",
"enum": [
"ramp",
"draw",
"removal",
"interaction",
"protection",
"wincon",
"value",
"combo",
],
},
},
"rationale": {"type": "string"},
},
"required": ["line_id", "role_keys", "rationale"],
},
}
},
"required": ["cards"],
}
return self._mtg_openai_request_structured_payload(
api_key=api_key,
model_name=model_name,
prompt=prompt,
system_instruction=(
"You are an expert Magic: The Gathering Commander deckbuilding analyst. "
"You classify cards into practical deckbuilding roles such as ramp, draw, "
"interaction, protection, win condition, value, and combo."
),
response_name="mvd_tcg_mtg_card_role_analysis",
schema=schema,
max_output_tokens=6400,
)
def _mtg_openai_request_alternative_analysis(self, api_key, model_name, prompt):
"""Call the Responses API for issue-line replacement suggestions."""
schema = {
"type": "object",
"additionalProperties": False,
"properties": {
"alternatives": {
"type": "array",
"items": {
"type": "object",
"additionalProperties": False,
"properties": {
"line_id": {"type": "integer"},
"summary": {"type": "string"},
"suggestions": {
"type": "array",
"items": {
"type": "object",
"additionalProperties": False,
"properties": {
"card_id": {"type": "integer"},
"reason": {"type": "string"},
},
"required": ["card_id", "reason"],
},
},
},
"required": ["line_id", "summary", "suggestions"],
},
}
},
"required": ["alternatives"],
}
return self._mtg_openai_request_structured_payload(
api_key=api_key,
model_name=model_name,
prompt=prompt,
system_instruction=(
"You are an expert Magic: The Gathering Commander deck builder. "
"You repair illegal or awkward deck lines by proposing replacement cards "
"that preserve deck function, mana curve, and commander synergy."
),
response_name="mvd_tcg_mtg_alternative_suggestions",
schema=schema,
max_output_tokens=7200,
)
def _mtg_openai_request_fill_selection(self, api_key, model_name, prompt):
"""Call the Responses API for a batch of deck-fill card selections."""
schema = {
"type": "object",
"additionalProperties": False,
"properties": {
"summary": {"type": "string"},
"cards": {
"type": "array",
"items": {
"type": "object",
"additionalProperties": False,
"properties": {
"card_id": {"type": "integer"},
"quantity": {"type": "integer"},
"reason": {"type": "string"},
},
"required": ["card_id", "quantity", "reason"],
},
},
},
"required": ["summary", "cards"],
}
return self._mtg_openai_request_structured_payload(
api_key=api_key,
model_name=model_name,
prompt=prompt,
system_instruction=(
"You are an expert Magic: The Gathering Commander deck builder. "
"You complete partial decks from a bounded in-system card pool while keeping "
"mana, interaction, draw, threat density, and commander synergy in balance."
),
response_name="mvd_tcg_mtg_fill_selection",
schema=schema,
max_output_tokens=9600,
)
def _mtg_openai_request_structured_payload(
self,
api_key,
model_name,
prompt,
system_instruction,
response_name,
schema,
max_output_tokens,
):
"""Call the Responses API for one structured JSON response.
Args:
api_key: OpenAI API key.
model_name: OpenAI model identifier.
prompt: Final user prompt.
system_instruction: System message content.
response_name: Stable JSON schema name.
schema: JSON schema definition.
max_output_tokens: Output token limit for the response.
Returns:
tuple[dict[str, object], str]: Parsed JSON payload and raw response.
"""
request_payload = {
"model": model_name,
"reasoning": {"effort": "low"},
"input": [
{
"role": "system",
"content": [
{
"type": "input_text",
"text": system_instruction,
}
],
},
{
"role": "user",
"content": [{"type": "input_text", "text": prompt}],
},
],
"text": {
"verbosity": "low",
"format": {
"type": "json_schema",
"name": response_name,
"schema": schema,
"strict": True,
}
},
"max_output_tokens": max_output_tokens,
}
http_request = request.Request(
f"{self._mtg_openai_get_api_base_url()}/responses",
data=json.dumps(request_payload).encode("utf-8"),
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
method="POST",
)
response_body = False
for attempt in range(1, 4):
try:
with request.urlopen(
http_request,
timeout=self._mtg_openai_get_request_timeout_seconds(),
) as response:
response_body = response.read().decode("utf-8")
break
except error.HTTPError as exc:
error_body = exc.read().decode("utf-8", errors="replace")
if exc.code in {429, 500, 502, 503, 504} and attempt < 3:
time.sleep(attempt * 2)
continue
raise UserError(
f"OpenAI request failed with status {exc.code}: {error_body}"
) from exc
except error.URLError as exc:
if attempt < 3:
time.sleep(attempt * 2)
continue
raise UserError(f"OpenAI request failed: {exc.reason}") from exc
response_payload = json.loads(response_body)
if response_payload.get("status") == "incomplete":
incomplete_reason = (
response_payload.get("incomplete_details", {}).get("reason")
or "unknown"
)
if incomplete_reason == "max_output_tokens":
raise UserError(
"OpenAI deck analysis hit the output token limit before it returned the final JSON."
)
raise UserError(
f"OpenAI deck analysis did not complete successfully: {incomplete_reason}."
)
text_payload = cls._mtg_openai_extract_output_text(response_payload)
try:
analysis_payload = json.loads(text_payload)
except json.JSONDecodeError as exc:
raise UserError("OpenAI returned a non-JSON analysis payload.") from exc
return analysis_payload, response_body
@staticmethod
def _mtg_openai_extract_output_text(response_payload):
"""Extract the text output from one Responses API payload.
Args:
response_payload: Decoded JSON payload returned by Responses.
Returns:
str: Concatenated text output.
"""
output_text = response_payload.get("output_text")
if output_text:
return output_text
fragments = []
for output_item in response_payload.get("output", []):
if output_item.get("type") != "message":
continue
for content_item in output_item.get("content", []):
text_value = content_item.get("text")
if text_value:
fragments.append(text_value)
if fragments:
return "\n".join(fragments)
raise UserError("OpenAI returned no text output for the deck analysis.")
@staticmethod
def _mtg_openai_render_paragraphs(text_value):
"""Render plain text paragraphs into safe Odoo HTML.
Args:
text_value: Plain text paragraph content.
Returns:
str | bool: Safe HTML markup or ``False`` if empty.
"""
clean_value = (text_value or "").strip()
if not clean_value:
return False
paragraphs = [
f"<p>{html.escape(paragraph.strip())}</p>"
for paragraph in clean_value.split("\n")
if paragraph.strip()
]
return "".join(paragraphs) or False
@staticmethod
def _mtg_openai_render_bullets(items):
"""Render a list of plain-text bullets into safe Odoo HTML.
Args:
items: Bullet strings.
Returns:
str | bool: Safe HTML markup or ``False`` if empty.
"""
cleaned_items = [
html.escape((item or "").strip())
for item in (items or [])
if (item or "").strip()
]
if not cleaned_items:
return False
bullet_markup = "".join(f"<li>{item}</li>" for item in cleaned_items)
return f"<ul>{bullet_markup}</ul>"
@staticmethod
def _mtg_openai_render_alternative_suggestions(prepared_suggestions):
"""Render alternative suggestions as compact deck-analysis HTML."""
sections = []
for item in prepared_suggestions:
issue_markup = ", ".join(html.escape(issue) for issue in item["issues"])
summary_markup = (
f"<p>{html.escape(item['summary'])}</p>" if item["summary"] else ""
)
if item["suggestions"]:
suggestion_markup = ""
for suggestion in item["suggestions"]:
mana_markup = (
f" {html.escape(suggestion['mana_cost'])}"
if suggestion["mana_cost"]
else ""
)
type_markup = (
f" <span class='text-muted'>{html.escape(suggestion['type_line'])}</span>"
if suggestion["type_line"]
else ""
)
reason_markup = (
f" <span class='text-muted'>- {html.escape(suggestion['reason'])}</span>"
if suggestion["reason"]
else ""
)
suggestion_markup += (
"<li>"
f"<strong>{html.escape(suggestion['name'])}</strong>"
f"{mana_markup}{type_markup}{reason_markup}"
"</li>"
)
else:
suggestion_markup = "<li>No strong in-system replacement was found.</li>"
sections.append(
(
"<div class=\"o_mvd_tcg_openai_result_block\">"
f"<h4>{html.escape(item['card_name'])}</h4>"
f"<p><strong>Issue</strong>: {issue_markup}</p>"
f"{summary_markup}"
f"<ul>{suggestion_markup}</ul>"
"</div>"
)
)
return "".join(sections) or False
@staticmethod
def _mtg_openai_render_fill_summary(fill_batches, total_added, remaining_slots):
"""Render fill batches as compact deck-analysis HTML."""
intro = (
f"<p>Added <strong>{total_added}</strong> card(s). "
f"Remaining mainboard slots: <strong>{remaining_slots}</strong>.</p>"
)
batch_markup = []
for index, batch in enumerate(fill_batches, start=1):
summary_markup = (
f"<p>{html.escape(batch['summary'])}</p>" if batch["summary"] else ""
)
card_markup = ""
for suggestion in batch["cards"]:
mana_markup = (
f" {html.escape(suggestion['mana_cost'])}"
if suggestion["mana_cost"]
else ""
)
type_markup = (
f" <span class='text-muted'>{html.escape(suggestion['type_line'])}</span>"
if suggestion["type_line"]
else ""
)
reason_markup = (
f" <span class='text-muted'>- {html.escape(suggestion['reason'])}</span>"
if suggestion["reason"]
else ""
)
card_markup += (
"<li>"
f"<strong>{suggestion['quantity']}x {html.escape(suggestion['name'])}</strong>"
f"{mana_markup}{type_markup}{reason_markup}"
"</li>"
)
batch_markup.append(
(
"<div class=\"o_mvd_tcg_openai_result_block\">"
f"<h4>Fill Batch {index}</h4>"
f"{summary_markup}"
f"<ul>{card_markup}</ul>"
"</div>"
)
)
return intro + "".join(batch_markup)