2104 lines
83 KiB
Python
2104 lines
83 KiB
Python
"""OpenAI-powered MTG deck analysis."""
|
|
|
|
import html
|
|
import json
|
|
import os
|
|
import time
|
|
from urllib import error, parse, request
|
|
|
|
from odoo import _, fields, models
|
|
from odoo.exceptions import UserError
|
|
|
|
from .constants import (
|
|
DEFAULT_MTG_OPENAI_API_BASE_URL,
|
|
DEFAULT_MTG_OPENAI_ALTERNATIVE_CANDIDATE_LIMIT,
|
|
DEFAULT_MTG_OPENAI_FILL_BATCH_SIZE,
|
|
DEFAULT_MTG_OPENAI_FILL_CANDIDATE_LIMIT,
|
|
DEFAULT_MTG_OPENAI_MODEL_NAME,
|
|
DEFAULT_MTG_OPENAI_REQUEST_TIMEOUT_SECONDS,
|
|
DEFAULT_MTG_OPENAI_ROLE_BATCH_SIZE,
|
|
)
|
|
|
|
|
|
class MvdTcgDeck(models.Model):
|
|
"""Extend MTG decks with on-demand OpenAI analysis fields."""
|
|
|
|
_inherit = "mvd.tcg.deck"
|
|
_MTG_OPENAI_ALLOWED_BASE_HOSTS = frozenset({"api.openai.com"})
|
|
|
|
_MTG_OPENAI_BATCH_SEPARATOR = "\n\n--- batch ---\n\n"
|
|
_MTG_OPENAI_STATE_SELECTION = [
|
|
("empty", "Not Analyzed"),
|
|
("done", "Ready"),
|
|
("failed", "Failed"),
|
|
]
|
|
_MTG_OPENAI_ROLE_BATCH_SIZE = DEFAULT_MTG_OPENAI_ROLE_BATCH_SIZE
|
|
_MTG_OPENAI_FILL_CANDIDATE_LIMIT = DEFAULT_MTG_OPENAI_FILL_CANDIDATE_LIMIT
|
|
_MTG_OPENAI_FILL_BATCH_SIZE = DEFAULT_MTG_OPENAI_FILL_BATCH_SIZE
|
|
_MTG_OPENAI_ALTERNATIVE_CANDIDATE_LIMIT = DEFAULT_MTG_OPENAI_ALTERNATIVE_CANDIDATE_LIMIT
|
|
_MTG_OPENAI_RUN_FIELD_MAP = {
|
|
"analysis": {
|
|
"state": "mtg_openai_analysis_state",
|
|
"error": "mtg_openai_last_error",
|
|
"analyzed_at": "mtg_openai_last_analyzed_at",
|
|
"model": "mtg_openai_last_model",
|
|
"lang": "mtg_openai_last_lang",
|
|
"prompt": "mtg_openai_last_prompt",
|
|
"response": "mtg_openai_last_response",
|
|
},
|
|
"role": {
|
|
"state": "mtg_openai_role_analysis_state",
|
|
"error": "mtg_openai_role_last_error",
|
|
"analyzed_at": "mtg_openai_role_last_analyzed_at",
|
|
"model": "mtg_openai_role_last_model",
|
|
"lang": "mtg_openai_role_last_lang",
|
|
"prompt": "mtg_openai_role_last_prompt",
|
|
"response": "mtg_openai_role_last_response",
|
|
},
|
|
"alternative": {
|
|
"state": "mtg_openai_alternative_state",
|
|
"error": "mtg_openai_alternative_last_error",
|
|
"analyzed_at": "mtg_openai_alternative_last_analyzed_at",
|
|
"model": "mtg_openai_alternative_last_model",
|
|
"lang": "mtg_openai_alternative_last_lang",
|
|
"prompt": "mtg_openai_alternative_last_prompt",
|
|
"response": "mtg_openai_alternative_last_response",
|
|
},
|
|
"fill": {
|
|
"state": "mtg_openai_fill_state",
|
|
"error": "mtg_openai_fill_last_error",
|
|
"analyzed_at": "mtg_openai_fill_last_analyzed_at",
|
|
"model": "mtg_openai_fill_last_model",
|
|
"lang": "mtg_openai_fill_last_lang",
|
|
"prompt": "mtg_openai_fill_last_prompt",
|
|
"response": "mtg_openai_fill_last_response",
|
|
},
|
|
}
|
|
|
|
mtg_openai_analysis_state = fields.Selection(
|
|
selection=_MTG_OPENAI_STATE_SELECTION,
|
|
string="Deck Analysis Status",
|
|
default="empty",
|
|
readonly=True,
|
|
copy=False,
|
|
)
|
|
mtg_openai_last_analyzed_at = fields.Datetime(
|
|
string="Last Deck Analysis",
|
|
readonly=True,
|
|
copy=False,
|
|
)
|
|
mtg_openai_last_model = fields.Char(
|
|
string="Analysis Model",
|
|
readonly=True,
|
|
copy=False,
|
|
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
|
|
)
|
|
mtg_openai_last_lang = fields.Char(
|
|
string="Analysis Language",
|
|
readonly=True,
|
|
copy=False,
|
|
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
|
|
)
|
|
mtg_openai_commander_summary = fields.Html(
|
|
string="Commander Summary",
|
|
readonly=True,
|
|
copy=False,
|
|
)
|
|
mtg_openai_gameplan = fields.Html(
|
|
string="Game Plan",
|
|
readonly=True,
|
|
copy=False,
|
|
)
|
|
mtg_openai_pilot_tips = fields.Html(
|
|
string="Pilot Tips",
|
|
readonly=True,
|
|
copy=False,
|
|
)
|
|
mtg_openai_risk_notes = fields.Html(
|
|
string="Risk Notes",
|
|
readonly=True,
|
|
copy=False,
|
|
)
|
|
mtg_openai_last_error = fields.Text(
|
|
string="Last Analysis Error",
|
|
readonly=True,
|
|
copy=False,
|
|
)
|
|
mtg_openai_last_prompt = fields.Text(
|
|
string="Last Analysis Prompt",
|
|
readonly=True,
|
|
copy=False,
|
|
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
|
|
)
|
|
mtg_openai_last_response = fields.Text(
|
|
string="Last Analysis Response",
|
|
readonly=True,
|
|
copy=False,
|
|
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
|
|
)
|
|
mtg_openai_role_analysis_state = fields.Selection(
|
|
selection=_MTG_OPENAI_STATE_SELECTION,
|
|
string="Role Analysis Status",
|
|
default="empty",
|
|
readonly=True,
|
|
copy=False,
|
|
)
|
|
mtg_openai_role_last_analyzed_at = fields.Datetime(
|
|
string="Last Role Analysis",
|
|
readonly=True,
|
|
copy=False,
|
|
)
|
|
mtg_openai_role_last_model = fields.Char(
|
|
string="Role Analysis Model",
|
|
readonly=True,
|
|
copy=False,
|
|
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
|
|
)
|
|
mtg_openai_role_last_lang = fields.Char(
|
|
string="Role Analysis Language",
|
|
readonly=True,
|
|
copy=False,
|
|
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
|
|
)
|
|
mtg_openai_role_last_error = fields.Text(
|
|
string="Last Role Analysis Error",
|
|
readonly=True,
|
|
copy=False,
|
|
)
|
|
mtg_openai_role_last_prompt = fields.Text(
|
|
string="Last Role Analysis Prompt",
|
|
readonly=True,
|
|
copy=False,
|
|
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
|
|
)
|
|
mtg_openai_role_last_response = fields.Text(
|
|
string="Last Role Analysis Response",
|
|
readonly=True,
|
|
copy=False,
|
|
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
|
|
)
|
|
mtg_openai_alternative_state = fields.Selection(
|
|
selection=_MTG_OPENAI_STATE_SELECTION,
|
|
string="Alternative Analysis Status",
|
|
default="empty",
|
|
readonly=True,
|
|
copy=False,
|
|
)
|
|
mtg_openai_alternative_last_analyzed_at = fields.Datetime(
|
|
string="Last Alternative Analysis",
|
|
readonly=True,
|
|
copy=False,
|
|
)
|
|
mtg_openai_alternative_last_model = fields.Char(
|
|
string="Alternative Analysis Model",
|
|
readonly=True,
|
|
copy=False,
|
|
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
|
|
)
|
|
mtg_openai_alternative_last_lang = fields.Char(
|
|
string="Alternative Analysis Language",
|
|
readonly=True,
|
|
copy=False,
|
|
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
|
|
)
|
|
mtg_openai_alternative_suggestions = fields.Html(
|
|
string="Alternative Suggestions",
|
|
readonly=True,
|
|
copy=False,
|
|
)
|
|
mtg_openai_alternative_last_error = fields.Text(
|
|
string="Last Alternative Analysis Error",
|
|
readonly=True,
|
|
copy=False,
|
|
)
|
|
mtg_openai_alternative_last_prompt = fields.Text(
|
|
string="Last Alternative Analysis Prompt",
|
|
readonly=True,
|
|
copy=False,
|
|
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
|
|
)
|
|
mtg_openai_alternative_last_response = fields.Text(
|
|
string="Last Alternative Analysis Response",
|
|
readonly=True,
|
|
copy=False,
|
|
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
|
|
)
|
|
mtg_openai_fill_state = fields.Selection(
|
|
selection=_MTG_OPENAI_STATE_SELECTION,
|
|
string="Deck Fill Status",
|
|
default="empty",
|
|
readonly=True,
|
|
copy=False,
|
|
)
|
|
mtg_openai_fill_last_analyzed_at = fields.Datetime(
|
|
string="Last Deck Fill",
|
|
readonly=True,
|
|
copy=False,
|
|
)
|
|
mtg_openai_fill_last_model = fields.Char(
|
|
string="Deck Fill Model",
|
|
readonly=True,
|
|
copy=False,
|
|
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
|
|
)
|
|
mtg_openai_fill_last_lang = fields.Char(
|
|
string="Deck Fill Language",
|
|
readonly=True,
|
|
copy=False,
|
|
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
|
|
)
|
|
mtg_openai_fill_summary = fields.Html(
|
|
string="Deck Fill Summary",
|
|
readonly=True,
|
|
copy=False,
|
|
)
|
|
mtg_openai_fill_last_error = fields.Text(
|
|
string="Last Deck Fill Error",
|
|
readonly=True,
|
|
copy=False,
|
|
)
|
|
mtg_openai_fill_last_prompt = fields.Text(
|
|
string="Last Deck Fill Prompt",
|
|
readonly=True,
|
|
copy=False,
|
|
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
|
|
)
|
|
mtg_openai_fill_last_response = fields.Text(
|
|
string="Last Deck Fill Response",
|
|
readonly=True,
|
|
copy=False,
|
|
groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system",
|
|
)
|
|
|
|
def _mtg_openai_check_manager_access(self):
|
|
"""Require manager-level rights for OpenAI-triggering actions.
|
|
|
|
Returns:
|
|
bool: ``True`` when the current user may trigger OpenAI flows.
|
|
"""
|
|
if self.env.is_superuser() or any(
|
|
self.env.user.has_group(xmlid)
|
|
for xmlid in (
|
|
"mvd_tcg_base.mvd_tcg_base_group_manager",
|
|
"base.group_system",
|
|
)
|
|
):
|
|
return True
|
|
raise UserError(
|
|
_("Only TCG managers can run OpenAI deck analysis actions.")
|
|
)
|
|
|
|
@classmethod
|
|
def _mtg_openai_validate_base_url(cls, url):
|
|
"""Validate the configured OpenAI base URL.
|
|
|
|
Args:
|
|
url: Absolute OpenAI API base URL.
|
|
|
|
Returns:
|
|
str: Validated absolute base URL without a trailing slash.
|
|
|
|
Raises:
|
|
UserError: If the URL does not target the official HTTPS API host.
|
|
"""
|
|
normalized_url = (url or "").strip().rstrip("/")
|
|
parsed_url = parse.urlparse(normalized_url)
|
|
hostname = (parsed_url.hostname or "").lower()
|
|
if parsed_url.scheme != "https" or hostname not in cls._MTG_OPENAI_ALLOWED_BASE_HOSTS:
|
|
raise UserError(
|
|
_("OpenAI API Base URL must use HTTPS and the official api.openai.com host.")
|
|
)
|
|
return normalized_url
|
|
|
|
def action_mtg_openai_analyze_deck(self):
|
|
"""Analyze one MTG deck with OpenAI and persist the result.
|
|
|
|
The analysis language follows the current user language, while the
|
|
Oracle text payload remains pinned to ``en_US`` for rules accuracy.
|
|
"""
|
|
self.ensure_one()
|
|
self._mtg_openai_check_manager_access()
|
|
if not self.is_mtg_deck:
|
|
raise UserError(_("OpenAI deck analysis is only available for Magic decks."))
|
|
|
|
api_key = self._mtg_openai_get_api_key()
|
|
model_name = self._mtg_openai_get_model_name()
|
|
analysis_lang = self._mtg_openai_get_analysis_lang()
|
|
prompt = self._mtg_openai_build_prompt(analysis_lang)
|
|
try:
|
|
analysis_payload, raw_response = self._mtg_openai_request_analysis(
|
|
api_key=api_key,
|
|
model_name=model_name,
|
|
prompt=prompt,
|
|
)
|
|
except UserError as exc:
|
|
self.write(
|
|
{
|
|
"mtg_openai_analysis_state": "failed",
|
|
"mtg_openai_last_error": str(exc),
|
|
"mtg_openai_last_analyzed_at": fields.Datetime.now(),
|
|
"mtg_openai_last_model": model_name,
|
|
"mtg_openai_last_lang": analysis_lang,
|
|
"mtg_openai_last_prompt": prompt,
|
|
}
|
|
)
|
|
raise
|
|
|
|
self.write(
|
|
{
|
|
"mtg_openai_analysis_state": "done",
|
|
"mtg_openai_last_error": False,
|
|
"mtg_openai_last_analyzed_at": fields.Datetime.now(),
|
|
"mtg_openai_last_model": model_name,
|
|
"mtg_openai_last_lang": analysis_lang,
|
|
"mtg_openai_last_prompt": prompt,
|
|
"mtg_openai_last_response": raw_response,
|
|
"mtg_openai_commander_summary": self._mtg_openai_render_paragraphs(
|
|
analysis_payload.get("commander_summary")
|
|
),
|
|
"mtg_openai_gameplan": self._mtg_openai_render_bullets(
|
|
analysis_payload.get("gameplan_bullets")
|
|
),
|
|
"mtg_openai_pilot_tips": self._mtg_openai_render_bullets(
|
|
analysis_payload.get("pilot_tips")
|
|
),
|
|
"mtg_openai_risk_notes": self._mtg_openai_render_bullets(
|
|
analysis_payload.get("risk_notes")
|
|
),
|
|
}
|
|
)
|
|
return {
|
|
"type": "ir.actions.act_window",
|
|
"res_model": "mvd.tcg.deck",
|
|
"res_id": self.id,
|
|
"view_mode": "form",
|
|
"target": "current",
|
|
}
|
|
|
|
def action_mtg_openai_analyze_card_roles(self):
|
|
"""Analyze role tags for the cards of one MTG deck.
|
|
|
|
The analysis language follows the active user language, while
|
|
``oracle_text`` remains pinned to ``en_US`` for rules accuracy.
|
|
|
|
Returns:
|
|
dict: Window action that reloads the current deck form.
|
|
"""
|
|
self.ensure_one()
|
|
self._mtg_openai_check_manager_access()
|
|
if not self.is_mtg_deck:
|
|
raise UserError(_("OpenAI card analysis is only available for Magic decks."))
|
|
|
|
api_key = self._mtg_openai_get_api_key()
|
|
model_name = self._mtg_openai_get_model_name()
|
|
analysis_lang = self._mtg_openai_get_analysis_lang()
|
|
role_map = self._mtg_openai_get_role_key_map()
|
|
card_payload = self._mtg_openai_collect_role_payload()
|
|
if not card_payload:
|
|
raise UserError(_("This deck has no cards to analyze."))
|
|
|
|
batch_size = self._mtg_openai_get_role_batch_size()
|
|
prompt_batches = []
|
|
response_batches = []
|
|
pending_updates = []
|
|
try:
|
|
for batch_start in range(0, len(card_payload), batch_size):
|
|
batch_payload = card_payload[batch_start:batch_start + batch_size]
|
|
prompt = self._mtg_openai_build_role_prompt(
|
|
analysis_lang=analysis_lang,
|
|
role_map=role_map,
|
|
batch_payload=batch_payload,
|
|
)
|
|
prompt_batches.append(prompt)
|
|
role_payload, raw_response = self._mtg_openai_request_role_analysis(
|
|
api_key=api_key,
|
|
model_name=model_name,
|
|
prompt=prompt,
|
|
)
|
|
response_batches.append(raw_response)
|
|
pending_updates.extend(
|
|
self._mtg_openai_prepare_role_updates(
|
|
batch_payload=batch_payload,
|
|
role_payload=role_payload,
|
|
role_map=role_map,
|
|
)
|
|
)
|
|
except UserError as exc:
|
|
self.write(
|
|
{
|
|
"mtg_openai_role_analysis_state": "failed",
|
|
"mtg_openai_role_last_error": str(exc),
|
|
"mtg_openai_role_last_analyzed_at": fields.Datetime.now(),
|
|
"mtg_openai_role_last_model": model_name,
|
|
"mtg_openai_role_last_lang": analysis_lang,
|
|
"mtg_openai_role_last_prompt": "\n\n--- batch ---\n\n".join(
|
|
prompt_batches
|
|
),
|
|
"mtg_openai_role_last_response": "\n\n--- batch ---\n\n".join(
|
|
response_batches
|
|
) or False,
|
|
}
|
|
)
|
|
raise
|
|
|
|
for values in pending_updates:
|
|
line = self.env["mvd.tcg.deck.line"].browse(values["line_id"])
|
|
line.write(
|
|
{
|
|
"role_ids": [(6, 0, values["role_ids"])],
|
|
"mtg_openai_role_rationale": values["rationale"],
|
|
}
|
|
)
|
|
|
|
self.write(
|
|
{
|
|
"mtg_openai_role_analysis_state": "done",
|
|
"mtg_openai_role_last_error": False,
|
|
"mtg_openai_role_last_analyzed_at": fields.Datetime.now(),
|
|
"mtg_openai_role_last_model": model_name,
|
|
"mtg_openai_role_last_lang": analysis_lang,
|
|
"mtg_openai_role_last_prompt": "\n\n--- batch ---\n\n".join(
|
|
prompt_batches
|
|
),
|
|
"mtg_openai_role_last_response": "\n\n--- batch ---\n\n".join(
|
|
response_batches
|
|
),
|
|
}
|
|
)
|
|
return {
|
|
"type": "ir.actions.act_window",
|
|
"res_model": "mvd.tcg.deck",
|
|
"res_id": self.id,
|
|
"view_mode": "form",
|
|
"target": "current",
|
|
}
|
|
|
|
def action_mtg_openai_suggest_alternatives(self):
|
|
"""Suggest replacement cards for current MTG issue lines."""
|
|
self.ensure_one()
|
|
self._mtg_openai_check_manager_access()
|
|
if not self.is_mtg_deck:
|
|
raise UserError(
|
|
_("OpenAI alternative suggestions are only available for Magic decks.")
|
|
)
|
|
|
|
issue_payload = self._mtg_openai_collect_alternative_payload()
|
|
if not issue_payload:
|
|
raise UserError(_("This deck has no replaceable issue lines right now."))
|
|
|
|
api_key = self._mtg_openai_get_api_key()
|
|
model_name = self._mtg_openai_get_model_name()
|
|
analysis_lang = self._mtg_openai_get_analysis_lang()
|
|
prompt = self._mtg_openai_build_alternative_prompt(
|
|
analysis_lang=analysis_lang,
|
|
issue_payload=issue_payload,
|
|
)
|
|
try:
|
|
alternative_payload, raw_response = self._mtg_openai_request_alternative_analysis(
|
|
api_key=api_key,
|
|
model_name=model_name,
|
|
prompt=prompt,
|
|
)
|
|
prepared_suggestions = self._mtg_openai_prepare_alternative_suggestions(
|
|
issue_payload=issue_payload,
|
|
alternative_payload=alternative_payload,
|
|
)
|
|
except UserError as exc:
|
|
self.write(
|
|
{
|
|
"mtg_openai_alternative_state": "failed",
|
|
"mtg_openai_alternative_last_error": str(exc),
|
|
"mtg_openai_alternative_last_analyzed_at": fields.Datetime.now(),
|
|
"mtg_openai_alternative_last_model": model_name,
|
|
"mtg_openai_alternative_last_lang": analysis_lang,
|
|
"mtg_openai_alternative_last_prompt": prompt,
|
|
}
|
|
)
|
|
raise
|
|
|
|
self.write(
|
|
{
|
|
"mtg_openai_alternative_state": "done",
|
|
"mtg_openai_alternative_last_error": False,
|
|
"mtg_openai_alternative_last_analyzed_at": fields.Datetime.now(),
|
|
"mtg_openai_alternative_last_model": model_name,
|
|
"mtg_openai_alternative_last_lang": analysis_lang,
|
|
"mtg_openai_alternative_last_prompt": prompt,
|
|
"mtg_openai_alternative_last_response": raw_response,
|
|
"mtg_openai_alternative_suggestions": self._mtg_openai_render_alternative_suggestions(
|
|
prepared_suggestions
|
|
),
|
|
}
|
|
)
|
|
return {
|
|
"type": "ir.actions.act_window",
|
|
"res_model": "mvd.tcg.deck",
|
|
"res_id": self.id,
|
|
"view_mode": "form",
|
|
"target": "current",
|
|
}
|
|
|
|
def action_mtg_openai_fill_deck(self):
|
|
"""Fill missing mainboard slots from the in-system MTG card pool."""
|
|
self.ensure_one()
|
|
self._mtg_openai_check_manager_access()
|
|
if not self.is_mtg_deck:
|
|
raise UserError(_("OpenAI deck fill is only available for Magic decks."))
|
|
if not self.mtg_commander_card_id:
|
|
raise UserError(_("Select at least one commander card first."))
|
|
if not self.mtg_mainboard_board_id:
|
|
raise UserError(_("This deck does not have a mainboard yet."))
|
|
if not self.mtg_expected_mainboard_size:
|
|
raise UserError(_("Select a supported MTG format before using deck fill."))
|
|
|
|
remaining_slots = self.mtg_expected_mainboard_size - self.mtg_mainboard_count
|
|
if remaining_slots <= 0:
|
|
raise UserError(_("The mainboard is already full for the selected format."))
|
|
|
|
api_key = self._mtg_openai_get_api_key()
|
|
model_name = self._mtg_openai_get_model_name()
|
|
analysis_lang = self._mtg_openai_get_analysis_lang()
|
|
rendered_batches = []
|
|
prompt_batches = []
|
|
response_batches = []
|
|
total_added = 0
|
|
|
|
try:
|
|
while remaining_slots > 0:
|
|
candidate_payload = self._mtg_openai_collect_fill_candidate_payload(
|
|
limit=self._mtg_openai_get_fill_candidate_limit()
|
|
)
|
|
if not candidate_payload:
|
|
break
|
|
|
|
batch_size = min(
|
|
remaining_slots,
|
|
self._mtg_openai_get_fill_batch_size(),
|
|
len(candidate_payload),
|
|
)
|
|
prompt = self._mtg_openai_build_fill_prompt(
|
|
analysis_lang=analysis_lang,
|
|
candidate_payload=candidate_payload,
|
|
batch_size=batch_size,
|
|
)
|
|
prompt_batches.append(prompt)
|
|
fill_payload, raw_response = self._mtg_openai_request_fill_selection(
|
|
api_key=api_key,
|
|
model_name=model_name,
|
|
prompt=prompt,
|
|
)
|
|
response_batches.append(raw_response)
|
|
prepared_fill = self._mtg_openai_prepare_fill_selection(
|
|
candidate_payload=candidate_payload,
|
|
fill_payload=fill_payload,
|
|
max_cards=batch_size,
|
|
)
|
|
if not prepared_fill["cards"]:
|
|
break
|
|
rendered_batches.append(prepared_fill)
|
|
for suggestion in prepared_fill["cards"]:
|
|
card = self.env["mvd.tcg.card"].browse(suggestion["card_id"]).exists()
|
|
if not card:
|
|
continue
|
|
self._mtg_openai_add_card_to_board(
|
|
card=card,
|
|
board=self.mtg_mainboard_board_id,
|
|
quantity=suggestion["quantity"],
|
|
)
|
|
total_added += suggestion["quantity"]
|
|
self.invalidate_recordset(
|
|
[
|
|
"mtg_mainboard_count",
|
|
"mtg_rule_warning_count",
|
|
"mtg_duplicate_card_count",
|
|
]
|
|
)
|
|
remaining_slots = self.mtg_expected_mainboard_size - self.mtg_mainboard_count
|
|
except UserError as exc:
|
|
self.write(
|
|
{
|
|
"mtg_openai_fill_state": "failed",
|
|
"mtg_openai_fill_last_error": str(exc),
|
|
"mtg_openai_fill_last_analyzed_at": fields.Datetime.now(),
|
|
"mtg_openai_fill_last_model": model_name,
|
|
"mtg_openai_fill_last_lang": analysis_lang,
|
|
"mtg_openai_fill_last_prompt": "\n\n--- batch ---\n\n".join(
|
|
prompt_batches
|
|
),
|
|
"mtg_openai_fill_last_response": "\n\n--- batch ---\n\n".join(
|
|
response_batches
|
|
) or False,
|
|
}
|
|
)
|
|
raise
|
|
|
|
summary_markup = self._mtg_openai_render_fill_summary(
|
|
fill_batches=rendered_batches,
|
|
total_added=total_added,
|
|
remaining_slots=max(
|
|
0,
|
|
self.mtg_expected_mainboard_size - self.mtg_mainboard_count,
|
|
),
|
|
)
|
|
self.write(
|
|
{
|
|
"mtg_openai_fill_state": "done",
|
|
"mtg_openai_fill_last_error": False,
|
|
"mtg_openai_fill_last_analyzed_at": fields.Datetime.now(),
|
|
"mtg_openai_fill_last_model": model_name,
|
|
"mtg_openai_fill_last_lang": analysis_lang,
|
|
"mtg_openai_fill_last_prompt": "\n\n--- batch ---\n\n".join(
|
|
prompt_batches
|
|
),
|
|
"mtg_openai_fill_last_response": "\n\n--- batch ---\n\n".join(
|
|
response_batches
|
|
) or False,
|
|
"mtg_openai_fill_summary": summary_markup,
|
|
}
|
|
)
|
|
return {
|
|
"type": "ir.actions.act_window",
|
|
"res_model": "mvd.tcg.deck",
|
|
"res_id": self.id,
|
|
"view_mode": "form",
|
|
"target": "current",
|
|
}
|
|
|
|
def _mtg_openai_get_api_key(self):
|
|
"""Return the configured OpenAI API key.
|
|
|
|
Returns:
|
|
str: The configured OpenAI API key.
|
|
"""
|
|
config_key = "mvd_tcg_mtg_deck_openai.api_key"
|
|
api_key = (self._mtg_openai_get_config_parameter(config_key) or "").strip()
|
|
if not api_key:
|
|
api_key = (os.getenv("OPENAI_API_KEY") or "").strip()
|
|
if not api_key:
|
|
raise UserError(_("Configure an OpenAI API key in TCG Settings first."))
|
|
return api_key
|
|
|
|
def _mtg_openai_get_config_parameter(self, name, default=False):
|
|
"""Return one OpenAI configuration parameter.
|
|
|
|
Args:
|
|
name: Technical config parameter key.
|
|
default: Fallback value when the parameter is unset.
|
|
|
|
Returns:
|
|
str | bool: Stored config value or the provided fallback.
|
|
"""
|
|
return self.env["ir.config_parameter"].sudo().get_param(name, default)
|
|
|
|
def _mtg_openai_get_positive_integer_setting(self, name, default):
|
|
"""Return one positive integer connector setting.
|
|
|
|
Args:
|
|
name: Technical config parameter key.
|
|
default: Fallback integer when the parameter is unset or invalid.
|
|
|
|
Returns:
|
|
int: Positive integer configuration value.
|
|
"""
|
|
raw_value = self._mtg_openai_get_config_parameter(name, default)
|
|
try:
|
|
return max(1, int(raw_value))
|
|
except (TypeError, ValueError):
|
|
return default
|
|
|
|
def _mtg_openai_get_model_name(self):
|
|
"""Return the configured OpenAI model name.
|
|
|
|
Returns:
|
|
str: The OpenAI model identifier.
|
|
"""
|
|
config_key = "mvd_tcg_mtg_deck_openai.model_name"
|
|
model_name = (
|
|
self._mtg_openai_get_config_parameter(
|
|
config_key,
|
|
DEFAULT_MTG_OPENAI_MODEL_NAME,
|
|
)
|
|
or DEFAULT_MTG_OPENAI_MODEL_NAME
|
|
).strip()
|
|
if model_name == DEFAULT_MTG_OPENAI_MODEL_NAME:
|
|
env_model_name = (os.getenv("OPENAI_MODEL") or "").strip()
|
|
if env_model_name:
|
|
return env_model_name
|
|
return model_name
|
|
|
|
def _mtg_openai_get_api_base_url(self):
|
|
"""Return the configured OpenAI API base URL.
|
|
|
|
Returns:
|
|
str: Base URL for Responses API requests.
|
|
"""
|
|
return self._mtg_openai_validate_base_url(
|
|
(
|
|
self._mtg_openai_get_config_parameter(
|
|
"mvd_tcg_mtg_deck_openai.api_base_url",
|
|
DEFAULT_MTG_OPENAI_API_BASE_URL,
|
|
)
|
|
or DEFAULT_MTG_OPENAI_API_BASE_URL
|
|
)
|
|
)
|
|
|
|
def _mtg_openai_get_request_timeout_seconds(self):
|
|
"""Return the configured OpenAI request timeout."""
|
|
return self._mtg_openai_get_positive_integer_setting(
|
|
"mvd_tcg_mtg_deck_openai.request_timeout_seconds",
|
|
DEFAULT_MTG_OPENAI_REQUEST_TIMEOUT_SECONDS,
|
|
)
|
|
|
|
def _mtg_openai_get_role_batch_size(self):
|
|
"""Return the configured role-analysis batch size."""
|
|
return self._mtg_openai_get_positive_integer_setting(
|
|
"mvd_tcg_mtg_deck_openai.role_batch_size",
|
|
self._MTG_OPENAI_ROLE_BATCH_SIZE,
|
|
)
|
|
|
|
def _mtg_openai_get_fill_candidate_limit(self):
|
|
"""Return the configured fill candidate pool size."""
|
|
return self._mtg_openai_get_positive_integer_setting(
|
|
"mvd_tcg_mtg_deck_openai.fill_candidate_limit",
|
|
self._MTG_OPENAI_FILL_CANDIDATE_LIMIT,
|
|
)
|
|
|
|
def _mtg_openai_get_fill_batch_size(self):
|
|
"""Return the configured fill selection batch size."""
|
|
return self._mtg_openai_get_positive_integer_setting(
|
|
"mvd_tcg_mtg_deck_openai.fill_batch_size",
|
|
self._MTG_OPENAI_FILL_BATCH_SIZE,
|
|
)
|
|
|
|
def _mtg_openai_get_alternative_candidate_limit(self):
|
|
"""Return the configured alternative suggestion pool size."""
|
|
return self._mtg_openai_get_positive_integer_setting(
|
|
"mvd_tcg_mtg_deck_openai.alternative_candidate_limit",
|
|
self._MTG_OPENAI_ALTERNATIVE_CANDIDATE_LIMIT,
|
|
)
|
|
|
|
def _mtg_openai_get_analysis_lang(self):
|
|
"""Return the active analysis language code.
|
|
|
|
Returns:
|
|
str: Odoo language code from context or user preferences.
|
|
"""
|
|
return (
|
|
self.env.context.get("lang")
|
|
or self.env.user.lang
|
|
or "en_US"
|
|
)
|
|
|
|
def _mtg_openai_get_analysis_language_name(self, lang_code):
|
|
"""Resolve a human-readable language name for one Odoo language code.
|
|
|
|
Args:
|
|
lang_code: Odoo language code such as ``de_DE``.
|
|
|
|
Returns:
|
|
str: Display name for prompting, for example ``German``.
|
|
"""
|
|
language = self.env["res.lang"].sudo().search([("code", "=", lang_code)], limit=1)
|
|
return language.name or lang_code or "English"
|
|
|
|
@classmethod
|
|
def _mtg_openai_join_batches(cls, items):
|
|
"""Join batched prompts or responses with one stable separator.
|
|
|
|
Args:
|
|
items: Ordered prompt or response chunks.
|
|
|
|
Returns:
|
|
str | bool: Joined payload text, or ``False`` when empty.
|
|
"""
|
|
cleaned_items = [item for item in items if item]
|
|
if not cleaned_items:
|
|
return False
|
|
return cls._MTG_OPENAI_BATCH_SEPARATOR.join(cleaned_items)
|
|
|
|
def _mtg_openai_write_run_state(
|
|
self,
|
|
run_key,
|
|
*,
|
|
state,
|
|
model_name,
|
|
analysis_lang,
|
|
prompt=False,
|
|
response=False,
|
|
error_message=False,
|
|
extra_values=None,
|
|
):
|
|
"""Persist one OpenAI run state update on the current deck.
|
|
|
|
Args:
|
|
run_key: Stable OpenAI run key such as ``analysis`` or ``fill``.
|
|
state: Target state value.
|
|
model_name: OpenAI model identifier used for the run.
|
|
analysis_lang: Active Odoo analysis language code.
|
|
prompt: Serialized prompt text or batch text.
|
|
response: Raw response text or joined batch responses.
|
|
error_message: Optional failure message.
|
|
extra_values: Optional additional field values to persist.
|
|
"""
|
|
self.ensure_one()
|
|
field_map = self._MTG_OPENAI_RUN_FIELD_MAP[run_key]
|
|
values = {
|
|
field_map["state"]: state,
|
|
field_map["error"]: error_message or False,
|
|
field_map["analyzed_at"]: fields.Datetime.now(),
|
|
field_map["model"]: model_name,
|
|
field_map["lang"]: analysis_lang,
|
|
field_map["prompt"]: prompt or False,
|
|
field_map["response"]: response or False,
|
|
}
|
|
if extra_values:
|
|
values.update(extra_values)
|
|
self.write(values)
|
|
|
|
@staticmethod
|
|
def _mtg_openai_build_faces_payload(card):
|
|
"""Serialize all explicit MTG card faces for AI prompts.
|
|
|
|
Args:
|
|
card: MTG card record in a stable language context.
|
|
|
|
Returns:
|
|
list[dict[str, object]]: Ordered face payloads.
|
|
"""
|
|
return [
|
|
{
|
|
"name": section["name"],
|
|
"mana_cost": section["mana_cost"],
|
|
"type_line": section["type_line"],
|
|
"oracle_text": section["oracle_text"],
|
|
}
|
|
for section in card.mtg_get_rules_sections()
|
|
]
|
|
|
|
def _mtg_openai_build_rules_payload(self, card):
|
|
"""Return one reusable rules payload for prompts.
|
|
|
|
Args:
|
|
card: MTG card record, in any language context.
|
|
|
|
Returns:
|
|
dict[str, object]: Face-aware rules payload built from English text.
|
|
"""
|
|
self.ensure_one()
|
|
english_card = card.with_context(lang="en_US")
|
|
return {
|
|
"oracle_text": english_card.mtg_get_rules_summary(),
|
|
"face_count": english_card.mtg_face_count,
|
|
"faces": self._mtg_openai_build_faces_payload(english_card),
|
|
}
|
|
|
|
def _mtg_openai_build_card_reference_payload(self, card):
|
|
"""Return one reusable English card payload for AI prompts."""
|
|
self.ensure_one()
|
|
english_card = card.with_context(lang="en_US")
|
|
return {
|
|
"name": english_card.display_name,
|
|
"mana_cost": english_card.mtg_mana_cost or False,
|
|
"mana_value": english_card.mtg_mana_value or 0.0,
|
|
"type_line": english_card.mtg_type_line or False,
|
|
"set": english_card.mtg_set_id.display_name if english_card.mtg_set_id else False,
|
|
"collector_number": english_card.mtg_collector_number or False,
|
|
**self._mtg_openai_build_rules_payload(english_card),
|
|
}
|
|
|
|
def _mtg_openai_build_prompt(self, analysis_lang):
|
|
"""Build the full analysis prompt for one MTG deck.
|
|
|
|
Args:
|
|
analysis_lang: Target output language code.
|
|
|
|
Returns:
|
|
str: Final prompt sent to the OpenAI Responses API.
|
|
"""
|
|
self.ensure_one()
|
|
analysis_language_name = self._mtg_openai_get_analysis_language_name(
|
|
analysis_lang
|
|
)
|
|
deck_payload = {
|
|
**self._mtg_openai_get_deck_identity_payload(),
|
|
"metrics": self._mtg_openai_get_metric_payload(),
|
|
"boards": self._mtg_openai_collect_board_payload(),
|
|
}
|
|
return (
|
|
"Analyze this Magic: The Gathering deck as an experienced Commander deck builder. "
|
|
"Explain how the deck functions from a pilot perspective. Focus on the actual card pool, "
|
|
"the commander, the mana curve, synergy packages, likely play patterns, and how the deck "
|
|
"wins or stabilizes. The first paragraph should work as a concise short description of the deck. "
|
|
"Do not invent cards or text. If something is uncertain, say so briefly. "
|
|
f"Write every value in the JSON response in {analysis_language_name}. "
|
|
"The provided oracle_text values are intentionally in English and must be treated as the authoritative rules text input. "
|
|
"Multi-face cards may also include a faces array with the authoritative English rules text for each printed face. "
|
|
"Treat those cards as one deck slot that can offer multiple spell options.\n\n"
|
|
"Return JSON with exactly this shape:\n"
|
|
"{\n"
|
|
' "commander_summary": "2-3 short paragraphs in plain text that explain how the deck works. The first paragraph must read like a compact deck blurb.",\n'
|
|
' "gameplan_bullets": ["3-6 concise bullets"],\n'
|
|
' "pilot_tips": ["3-6 concise bullets"],\n'
|
|
' "risk_notes": ["2-5 concise bullets"]\n'
|
|
"}\n\n"
|
|
"Deck data:\n"
|
|
f"{json.dumps(deck_payload, ensure_ascii=False, indent=2)}"
|
|
)
|
|
|
|
def _mtg_openai_build_role_prompt(self, analysis_lang, role_map, batch_payload):
|
|
"""Build one prompt that classifies role tags for deck lines.
|
|
|
|
Args:
|
|
analysis_lang: Target output language code.
|
|
role_map: Mapping from stable role keys to deck-role records.
|
|
batch_payload: Deck-line batch to classify.
|
|
|
|
Returns:
|
|
str: Final prompt for one role-analysis batch.
|
|
"""
|
|
analysis_language_name = self._mtg_openai_get_analysis_language_name(
|
|
analysis_lang
|
|
)
|
|
role_payload = [
|
|
{
|
|
"key": role_key,
|
|
"name": role_record.display_name,
|
|
"note": role_record.note or False,
|
|
}
|
|
for role_key, role_record in role_map.items()
|
|
]
|
|
return (
|
|
"Analyze these Magic: The Gathering Commander deck cards and assign deckbuilding roles. "
|
|
"Use only the allowed role keys. A card may have zero, one, or multiple roles. "
|
|
"Prefer precise role tags over broad tagging. Do not invent cards or rules text. "
|
|
f"Write every rationale in {analysis_language_name}. "
|
|
"The provided oracle_text values are intentionally in English and must be treated as the authoritative rules text input. "
|
|
"Multi-face cards may also include a faces array with the authoritative English rules text for each printed face. "
|
|
"Treat those cards as one deck slot that can offer multiple spell options.\n\n"
|
|
"Return JSON with exactly this shape:\n"
|
|
"{\n"
|
|
' "cards": [\n'
|
|
" {\n"
|
|
' "line_id": 123,\n'
|
|
' "role_keys": ["ramp", "value"],\n'
|
|
' "rationale": "One short sentence."\n'
|
|
" }\n"
|
|
" ]\n"
|
|
"}\n\n"
|
|
"Every input line_id must appear exactly once in the cards array.\n\n"
|
|
"Allowed roles:\n"
|
|
f"{json.dumps(role_payload, ensure_ascii=False, indent=2)}\n\n"
|
|
"Cards to classify:\n"
|
|
f"{json.dumps(batch_payload, ensure_ascii=False, indent=2)}"
|
|
)
|
|
|
|
def _mtg_openai_build_alternative_prompt(self, analysis_lang, issue_payload):
|
|
"""Build the prompt that suggests replacements for problematic lines."""
|
|
self.ensure_one()
|
|
analysis_language_name = self._mtg_openai_get_analysis_language_name(
|
|
analysis_lang
|
|
)
|
|
deck_payload = {
|
|
**self._mtg_openai_get_deck_identity_payload(),
|
|
"issues": issue_payload,
|
|
}
|
|
return (
|
|
"You are helping to repair a Magic: The Gathering Commander deck. "
|
|
"For each problematic deck line, suggest 2 to 3 replacement cards from the provided in-system candidate pool. "
|
|
"Keep the deck inside the active commander color identity and avoid singleton conflicts. "
|
|
"Prefer candidates that preserve the card's likely job in the deck, such as ramp, draw, removal, or threat density. "
|
|
"Only use candidate card_ids listed under each issue line. "
|
|
f"Write every explanation in {analysis_language_name}. "
|
|
"The provided oracle_text values are intentionally in English and must be treated as the authoritative rules text input. "
|
|
"Multi-face cards may include a faces array and should still be treated as one card slot.\n\n"
|
|
"Return JSON with exactly this shape:\n"
|
|
"{\n"
|
|
' "alternatives": [\n'
|
|
" {\n"
|
|
' "line_id": 123,\n'
|
|
' "summary": "One short sentence.",\n'
|
|
' "suggestions": [\n'
|
|
" {\n"
|
|
' "card_id": 456,\n'
|
|
' "reason": "One short sentence."\n'
|
|
" }\n"
|
|
" ]\n"
|
|
" }\n"
|
|
" ]\n"
|
|
"}\n\n"
|
|
"Every input line_id must appear exactly once. If no useful replacement exists for a line, return an empty suggestions list for that line.\n\n"
|
|
"Deck repair payload:\n"
|
|
f"{json.dumps(deck_payload, ensure_ascii=False, indent=2)}"
|
|
)
|
|
|
|
def _mtg_openai_build_fill_prompt(self, analysis_lang, candidate_payload, batch_size):
|
|
"""Build the prompt that fills missing mainboard slots."""
|
|
self.ensure_one()
|
|
analysis_language_name = self._mtg_openai_get_analysis_language_name(
|
|
analysis_lang
|
|
)
|
|
deck_payload = {
|
|
**self._mtg_openai_get_deck_identity_payload(),
|
|
"current_metrics": {
|
|
**self._mtg_openai_get_metric_payload(),
|
|
"tagged_cards": self.mtg_tagged_line_count,
|
|
"untagged_cards": self.mtg_untagged_line_count,
|
|
"role_coverage_ratio": self.mtg_role_coverage_ratio,
|
|
"remaining_mainboard_slots": self.mtg_expected_mainboard_size - self.mtg_mainboard_count,
|
|
},
|
|
"candidates": candidate_payload,
|
|
}
|
|
return (
|
|
"You are finishing a Magic: The Gathering Commander deck from an in-system card pool. "
|
|
"Select the strongest additions for the next batch of missing mainboard slots. "
|
|
"Balance mana development, card flow, interaction, threats, and commander synergy. "
|
|
"Use only candidate card_ids from the provided pool. "
|
|
"Do not invent cards. "
|
|
"Use quantity values greater than 1 only when allows_multiple_copies is true. "
|
|
f"Write every explanation in {analysis_language_name}. "
|
|
"The provided oracle_text values are intentionally in English and must be treated as the authoritative rules text input. "
|
|
"Multi-face cards may include a faces array and should still be treated as one card slot.\n\n"
|
|
"Return JSON with exactly this shape:\n"
|
|
"{\n"
|
|
' "summary": "One short paragraph.",\n'
|
|
' "cards": [\n'
|
|
" {\n"
|
|
' "card_id": 456,\n'
|
|
' "quantity": 1,\n'
|
|
' "reason": "One short sentence."\n'
|
|
" }\n"
|
|
" ]\n"
|
|
"}\n\n"
|
|
f"Select at most {batch_size} total cards in this batch.\n\n"
|
|
"Deck fill payload:\n"
|
|
f"{json.dumps(deck_payload, ensure_ascii=False, indent=2)}"
|
|
)
|
|
|
|
def _mtg_openai_get_deck_identity_payload(self):
|
|
"""Return the stable deck identity payload reused across prompts.
|
|
|
|
Returns:
|
|
dict[str, object]: Deck name, format, commander, and color identity.
|
|
"""
|
|
self.ensure_one()
|
|
return {
|
|
"deck_name": self.name,
|
|
"format": self.mtg_format_id.display_name or False,
|
|
"commander": self.mtg_commander_card_id.display_name or False,
|
|
"color_identity": {
|
|
"name": self.mtg_color_identity_name or False,
|
|
"signature": self.mtg_color_identity_signature or False,
|
|
},
|
|
}
|
|
|
|
def _mtg_openai_get_metric_payload(self):
|
|
"""Return the stable MTG metric payload reused across prompts.
|
|
|
|
Returns:
|
|
dict[str, object]: Current deck metric snapshot.
|
|
"""
|
|
self.ensure_one()
|
|
return {
|
|
"mainboard_count": self.mtg_mainboard_count,
|
|
"command_zone_count": self.mtg_command_zone_count,
|
|
"sideboard_count": self.mtg_sideboard_count,
|
|
"maybeboard_count": self.mtg_maybeboard_count,
|
|
"average_mana_value": self.mtg_average_mana_value,
|
|
"land_count": self.mtg_land_count,
|
|
"creature_count": self.mtg_creature_count,
|
|
"artifact_count": self.mtg_artifact_count,
|
|
"enchantment_count": self.mtg_enchantment_count,
|
|
"planeswalker_count": self.mtg_planeswalker_count,
|
|
"instant_count": self.mtg_instant_count,
|
|
"sorcery_count": self.mtg_sorcery_count,
|
|
}
|
|
|
|
def _mtg_openai_collect_board_payload(self):
|
|
"""Collect a structured board snapshot for the OpenAI prompt.
|
|
|
|
Oracle text is read explicitly in ``en_US`` to keep the rules input
|
|
stable even when the active user works in another language.
|
|
|
|
Returns:
|
|
list[dict[str, object]]: Board and card payloads for the prompt.
|
|
"""
|
|
self.ensure_one()
|
|
board_payload = []
|
|
for board in self.board_ids.sorted(
|
|
lambda current_board: (current_board.sequence, current_board.id)
|
|
):
|
|
if not board.include_in_total and board.code != "command_zone":
|
|
continue
|
|
card_payloads = []
|
|
for line in board.line_ids.sorted(
|
|
lambda current_line: (
|
|
current_line.board_sequence,
|
|
getattr(current_line, "mtg_mana_value", 0.0),
|
|
current_line.card_id.display_name or "",
|
|
current_line.id,
|
|
)
|
|
):
|
|
card_payload = self._mtg_openai_build_card_reference_payload(line.card_id)
|
|
if (
|
|
card_payload["face_count"] <= 1
|
|
and card_payload["type_line"]
|
|
and "Land" in card_payload["type_line"]
|
|
):
|
|
card_payload["oracle_text"] = False
|
|
card_payloads.append(
|
|
{
|
|
"quantity": line.quantity,
|
|
"roles": line.role_ids.sorted(
|
|
key=lambda role: (role.sequence, role.name or "", role.id)
|
|
).mapped("name"),
|
|
**card_payload,
|
|
}
|
|
)
|
|
board_payload.append(
|
|
{
|
|
"name": board.name,
|
|
"code": board.code,
|
|
"include_in_total": board.include_in_total,
|
|
"cards": card_payloads,
|
|
}
|
|
)
|
|
return board_payload
|
|
|
|
def _mtg_openai_collect_role_payload(self):
|
|
"""Collect deck-line payloads for role classification.
|
|
|
|
Returns:
|
|
list[dict[str, object]]: Structured deck-line payloads.
|
|
"""
|
|
self.ensure_one()
|
|
payload = []
|
|
for line in self.line_ids.sorted(
|
|
lambda current_line: (
|
|
current_line.board_sequence,
|
|
getattr(current_line, "mtg_mana_value", 0.0),
|
|
current_line.card_id.display_name or "",
|
|
current_line.id,
|
|
)
|
|
):
|
|
payload.append(
|
|
{
|
|
"line_id": line.id,
|
|
"board": line.board_id.code or line.board_id.name,
|
|
"quantity": line.quantity,
|
|
**self._mtg_openai_build_card_reference_payload(line.card_id),
|
|
}
|
|
)
|
|
return payload
|
|
|
|
def _mtg_openai_collect_alternative_payload(self):
|
|
"""Collect issue lines plus per-line replacement candidates."""
|
|
self.ensure_one()
|
|
issue_payload = []
|
|
issue_lines = self.line_ids.filtered(
|
|
lambda line: line.board_id.include_in_total and line.mtg_issue_count
|
|
).sorted(
|
|
lambda current_line: (
|
|
current_line.board_sequence,
|
|
current_line.primary_role_sequence,
|
|
current_line.mtg_mana_value,
|
|
current_line.card_id.display_name or "",
|
|
current_line.id,
|
|
)
|
|
)
|
|
for line in issue_lines:
|
|
candidates = self._mtg_openai_get_alternative_candidate_cards(
|
|
issue_line=line,
|
|
limit=self._mtg_openai_get_alternative_candidate_limit(),
|
|
)
|
|
if not candidates:
|
|
continue
|
|
issue_payload.append(
|
|
{
|
|
"line_id": line.id,
|
|
"card": self._mtg_openai_build_line_payload(line),
|
|
"issues": self._mtg_openai_get_issue_labels(line),
|
|
"candidates": [
|
|
self._mtg_openai_build_card_payload(card)
|
|
for card in candidates
|
|
],
|
|
}
|
|
)
|
|
return issue_payload
|
|
|
|
def _mtg_openai_collect_fill_candidate_payload(self, limit):
|
|
"""Collect a balanced candidate pool for AI-driven deck fill."""
|
|
self.ensure_one()
|
|
candidates = self._mtg_openai_get_fill_candidate_cards(limit=limit)
|
|
return [self._mtg_openai_build_card_payload(card) for card in candidates]
|
|
|
|
def _mtg_openai_get_issue_labels(self, line):
|
|
"""Return stable issue labels for one MTG deck line."""
|
|
issue_labels = []
|
|
if line.mtg_singleton_violation:
|
|
issue_labels.append("singleton")
|
|
if line.mtg_color_identity_violation:
|
|
issue_labels.append("color_identity")
|
|
if not line.mtg_legality_ok:
|
|
issue_labels.append(line.mtg_legality_status or "illegal")
|
|
return issue_labels or ["issue"]
|
|
|
|
def _mtg_openai_build_line_payload(self, line):
|
|
"""Serialize one deck line for OpenAI prompts."""
|
|
return {
|
|
"line_id": line.id,
|
|
"board": line.board_id.code or line.board_id.name,
|
|
"quantity": line.quantity,
|
|
"roles": line.role_ids.sorted(
|
|
key=lambda role: (role.sequence, role.name or "", role.id)
|
|
).mapped("name"),
|
|
**self._mtg_openai_build_card_reference_payload(line.card_id),
|
|
}
|
|
|
|
def _mtg_openai_build_card_payload(self, card):
|
|
"""Serialize one MTG card candidate for OpenAI prompts."""
|
|
english_card = card.with_context(lang="en_US")
|
|
primary_types = english_card.mtg_card_type_ids.sorted(
|
|
key=lambda card_type: (card_type.sequence, card_type.name or "", card_type.id)
|
|
).mapped("name")
|
|
legality_status = self.env["mvd.tcg.deck.line"]._mtg_get_card_legality_status(
|
|
english_card,
|
|
self.mtg_format_id.code or "",
|
|
)
|
|
return {
|
|
"card_id": english_card.id,
|
|
"color_identity": {
|
|
"signature": english_card.mtg_color_identity_signature or False,
|
|
"colors": english_card.mtg_color_identity_ids.sorted(
|
|
key=lambda color: (color.sequence, color.code or "", color.id)
|
|
).mapped("name"),
|
|
},
|
|
**self._mtg_openai_build_card_reference_payload(english_card),
|
|
"rarity": english_card.mtg_rarity_id.display_name if english_card.mtg_rarity_id else False,
|
|
"types": primary_types,
|
|
"allows_multiple_copies": english_card.mtg_allows_unlimited_copies(),
|
|
"current_quantity": self._mtg_openai_get_current_mainboard_quantity(english_card),
|
|
"legality_status": legality_status,
|
|
}
|
|
|
|
def _mtg_openai_get_current_mainboard_quantity(self, card):
|
|
"""Return the current mainboard quantity for one card record."""
|
|
self.ensure_one()
|
|
mainboard_line = self.mtg_mainboard_line_ids.filtered(
|
|
lambda line: line.card_id == card
|
|
)[:1]
|
|
return mainboard_line.quantity if mainboard_line else 0
|
|
|
|
def _mtg_openai_get_existing_singleton_aliases(self):
|
|
"""Return singleton aliases currently present in included MTG boards."""
|
|
self.ensure_one()
|
|
aliases = set()
|
|
for card in self.line_ids.filtered(
|
|
lambda line: line.board_id.include_in_total and line.card_id.game_id.code == "mtg"
|
|
).mapped("card_id").with_context(lang="en_US"):
|
|
aliases.update(card.mtg_get_singleton_key_aliases())
|
|
return aliases
|
|
|
|
def _mtg_openai_identity_allows_card(self, card):
|
|
"""Return whether one card fits the current commander identity."""
|
|
self.ensure_one()
|
|
commander_identity = set(self.mtg_color_identity_signature or "")
|
|
if not commander_identity:
|
|
return True
|
|
return not (set(card.mtg_color_identity_signature or "") - commander_identity)
|
|
|
|
def _mtg_openai_is_legal_candidate(self, card):
|
|
"""Return whether one card can be considered for the active format."""
|
|
legality_status = self.env["mvd.tcg.deck.line"]._mtg_get_card_legality_status(
|
|
card,
|
|
self.mtg_format_id.code or "",
|
|
)
|
|
return legality_status not in {"banned", "not_legal"}
|
|
|
|
def _mtg_openai_get_base_fill_candidates(self):
|
|
"""Return the coarse MTG candidate pool for deck fill."""
|
|
self.ensure_one()
|
|
card_model = self.env["mvd.tcg.card"].with_context(lang="en_US")
|
|
existing_aliases = self._mtg_openai_get_existing_singleton_aliases()
|
|
candidates = card_model.browse()
|
|
for card in card_model.search(
|
|
[
|
|
("game_id.code", "=", "mtg"),
|
|
("active", "=", True),
|
|
("mtg_is_token", "=", False),
|
|
("mtg_is_digital", "=", False),
|
|
],
|
|
order="mtg_collector_sort_key, id",
|
|
):
|
|
if not self._mtg_openai_identity_allows_card(card):
|
|
continue
|
|
if not self._mtg_openai_is_legal_candidate(card):
|
|
continue
|
|
if not card.mtg_allows_unlimited_copies() and (
|
|
existing_aliases & set(card.mtg_get_singleton_key_aliases())
|
|
):
|
|
continue
|
|
candidates |= card
|
|
return candidates
|
|
|
|
def _mtg_openai_get_fill_candidate_cards(self, limit):
|
|
"""Return a balanced, size-limited MTG candidate pool for deck fill."""
|
|
self.ensure_one()
|
|
type_priority = (
|
|
"land",
|
|
"creature",
|
|
"artifact",
|
|
"enchantment",
|
|
"instant",
|
|
"sorcery",
|
|
"planeswalker",
|
|
"other",
|
|
)
|
|
grouped_candidates = {type_code: [] for type_code in type_priority}
|
|
for card in self._mtg_openai_get_base_fill_candidates():
|
|
type_codes = set(card.mtg_card_type_ids.mapped("code"))
|
|
if "land" in type_codes:
|
|
grouped_candidates["land"].append(card.id)
|
|
elif "creature" in type_codes:
|
|
grouped_candidates["creature"].append(card.id)
|
|
elif "artifact" in type_codes:
|
|
grouped_candidates["artifact"].append(card.id)
|
|
elif "enchantment" in type_codes:
|
|
grouped_candidates["enchantment"].append(card.id)
|
|
elif "instant" in type_codes:
|
|
grouped_candidates["instant"].append(card.id)
|
|
elif "sorcery" in type_codes:
|
|
grouped_candidates["sorcery"].append(card.id)
|
|
elif "planeswalker" in type_codes:
|
|
grouped_candidates["planeswalker"].append(card.id)
|
|
else:
|
|
grouped_candidates["other"].append(card.id)
|
|
|
|
ordered_ids = []
|
|
indices = {type_code: 0 for type_code in type_priority}
|
|
while len(ordered_ids) < limit:
|
|
did_append = False
|
|
for type_code in type_priority:
|
|
bucket = grouped_candidates[type_code]
|
|
bucket_index = indices[type_code]
|
|
if bucket_index >= len(bucket):
|
|
continue
|
|
ordered_ids.append(bucket[bucket_index])
|
|
indices[type_code] += 1
|
|
did_append = True
|
|
if len(ordered_ids) >= limit:
|
|
break
|
|
if not did_append:
|
|
break
|
|
return self.env["mvd.tcg.card"].browse(ordered_ids)
|
|
|
|
def _mtg_openai_get_alternative_candidate_cards(self, issue_line, limit):
|
|
"""Return a narrow replacement pool for one problematic line."""
|
|
self.ensure_one()
|
|
target_card = issue_line.card_id.with_context(lang="en_US")
|
|
target_types = set(target_card.mtg_card_type_ids.mapped("code"))
|
|
target_mana_value = target_card.mtg_mana_value or 0.0
|
|
existing_aliases = self._mtg_openai_get_existing_singleton_aliases()
|
|
candidates = self.env["mvd.tcg.card"].browse()
|
|
fallback_candidates = self.env["mvd.tcg.card"].browse()
|
|
for card in self._mtg_openai_get_base_fill_candidates():
|
|
if card == target_card:
|
|
continue
|
|
if existing_aliases & set(card.mtg_get_singleton_key_aliases()):
|
|
continue
|
|
card_types = set(card.mtg_card_type_ids.mapped("code"))
|
|
mana_gap = abs((card.mtg_mana_value or 0.0) - target_mana_value)
|
|
if target_types and card_types and target_types & card_types:
|
|
if mana_gap <= 3.0:
|
|
candidates |= card
|
|
else:
|
|
fallback_candidates |= card
|
|
else:
|
|
fallback_candidates |= card
|
|
ordered_candidates = (candidates | fallback_candidates).sorted(
|
|
key=lambda card: (
|
|
0
|
|
if target_types & set(card.mtg_card_type_ids.mapped("code"))
|
|
else 1,
|
|
abs((card.mtg_mana_value or 0.0) - target_mana_value),
|
|
card.mtg_mana_value or 0.0,
|
|
card.display_name or "",
|
|
card.id,
|
|
)
|
|
)
|
|
return ordered_candidates[:limit]
|
|
|
|
def _mtg_openai_add_card_to_board(self, card, board, quantity=1):
|
|
"""Create or increment one deck line in a validated way."""
|
|
self.ensure_one()
|
|
line_model = self.env["mvd.tcg.deck.line"]
|
|
existing_line = line_model.search(
|
|
[
|
|
("board_id", "=", board.id),
|
|
("card_id", "=", card.id),
|
|
],
|
|
limit=1,
|
|
)
|
|
self._mvd_tcg_validate_add_to_board(
|
|
card,
|
|
board,
|
|
quantity=quantity,
|
|
existing_line=existing_line,
|
|
)
|
|
if existing_line:
|
|
existing_line.write({"quantity": existing_line.quantity + quantity})
|
|
return existing_line
|
|
return line_model.create(
|
|
{
|
|
"board_id": board.id,
|
|
"quantity": quantity,
|
|
"card_id": card.id,
|
|
}
|
|
)
|
|
|
|
def _mtg_openai_get_role_key_map(self):
|
|
"""Return the supported deck-role records keyed by stable AI keys.
|
|
|
|
Returns:
|
|
dict[str, Model]: Stable role-key mapping.
|
|
"""
|
|
role_xmlids = {
|
|
"ramp": "mvd_tcg_deck.mvd_tcg_deck_role_ramp",
|
|
"draw": "mvd_tcg_deck.mvd_tcg_deck_role_draw",
|
|
"removal": "mvd_tcg_deck.mvd_tcg_deck_role_removal",
|
|
"interaction": "mvd_tcg_deck.mvd_tcg_deck_role_interaction",
|
|
"protection": "mvd_tcg_deck.mvd_tcg_deck_role_protection",
|
|
"wincon": "mvd_tcg_deck.mvd_tcg_deck_role_wincon",
|
|
"value": "mvd_tcg_deck.mvd_tcg_deck_role_value",
|
|
"combo": "mvd_tcg_deck.mvd_tcg_deck_role_combo",
|
|
}
|
|
role_keys = list(role_xmlids)
|
|
role_model = self.env["mvd.tcg.deck.role"]
|
|
roles = role_model.search([("technical_key", "in", role_keys)])
|
|
role_map = {role.technical_key: role for role in roles}
|
|
missing_role_keys = [role_key for role_key in role_keys if role_key not in role_map]
|
|
if missing_role_keys:
|
|
for role_key in list(missing_role_keys):
|
|
fallback_role = self.env.ref(role_xmlids[role_key], raise_if_not_found=False)
|
|
if not fallback_role:
|
|
continue
|
|
role_map[role_key] = fallback_role
|
|
missing_role_keys = [
|
|
role_key for role_key in role_keys if role_key not in role_map
|
|
]
|
|
if missing_role_keys:
|
|
raise UserError(
|
|
_(
|
|
"Missing configured deck roles for: %s"
|
|
)
|
|
% ", ".join(missing_role_keys)
|
|
)
|
|
return role_map
|
|
|
|
def _mtg_openai_prepare_alternative_suggestions(self, issue_payload, alternative_payload):
|
|
"""Validate and normalize alternative suggestions returned by OpenAI."""
|
|
expected_ids = {item["line_id"] for item in issue_payload}
|
|
candidate_ids_by_line = {
|
|
item["line_id"]: {candidate["card_id"] for candidate in item["candidates"]}
|
|
for item in issue_payload
|
|
}
|
|
issue_map = {item["line_id"]: item for item in issue_payload}
|
|
prepared = []
|
|
seen_ids = set()
|
|
for item in alternative_payload.get("alternatives") or []:
|
|
line_id = item.get("line_id")
|
|
if line_id not in expected_ids:
|
|
raise UserError(
|
|
_("OpenAI returned alternatives for an unknown deck line.")
|
|
)
|
|
if line_id in seen_ids:
|
|
raise UserError(
|
|
_("OpenAI returned duplicate alternative rows for one deck line.")
|
|
)
|
|
seen_ids.add(line_id)
|
|
allowed_candidate_ids = candidate_ids_by_line[line_id]
|
|
suggestions = []
|
|
seen_candidate_ids = set()
|
|
for suggestion in item.get("suggestions") or []:
|
|
card_id = suggestion.get("card_id")
|
|
if card_id not in allowed_candidate_ids:
|
|
raise UserError(
|
|
_("OpenAI returned an unsupported replacement candidate.")
|
|
)
|
|
if card_id in seen_candidate_ids:
|
|
continue
|
|
seen_candidate_ids.add(card_id)
|
|
candidate = self.env["mvd.tcg.card"].browse(card_id).exists()
|
|
if not candidate:
|
|
continue
|
|
suggestions.append(
|
|
{
|
|
"card_id": candidate.id,
|
|
"name": candidate.display_name,
|
|
"mana_cost": candidate.mtg_mana_cost or False,
|
|
"type_line": candidate.mtg_type_line or False,
|
|
"reason": (suggestion.get("reason") or "").strip() or False,
|
|
}
|
|
)
|
|
prepared.append(
|
|
{
|
|
"line_id": line_id,
|
|
"card_name": issue_map[line_id]["card"]["name"],
|
|
"issues": issue_map[line_id]["issues"],
|
|
"summary": (item.get("summary") or "").strip() or False,
|
|
"suggestions": suggestions,
|
|
}
|
|
)
|
|
if seen_ids != expected_ids:
|
|
raise UserError(
|
|
_("OpenAI did not return alternative rows for every issue line.")
|
|
)
|
|
return prepared
|
|
|
|
def _mtg_openai_prepare_fill_selection(self, candidate_payload, fill_payload, max_cards):
|
|
"""Validate and normalize one batch of fill-card selections."""
|
|
allowed_candidates = {
|
|
item["card_id"]: item for item in candidate_payload
|
|
}
|
|
selected_cards = []
|
|
seen_card_ids = set()
|
|
total_quantity = 0
|
|
for item in fill_payload.get("cards") or []:
|
|
card_id = item.get("card_id")
|
|
candidate = allowed_candidates.get(card_id)
|
|
if not candidate:
|
|
raise UserError(_("OpenAI returned a card outside the allowed pool."))
|
|
if card_id in seen_card_ids:
|
|
raise UserError(_("OpenAI returned the same fill card more than once."))
|
|
seen_card_ids.add(card_id)
|
|
quantity = int(item.get("quantity") or 1)
|
|
if quantity <= 0:
|
|
raise UserError(_("OpenAI returned an invalid fill quantity."))
|
|
if quantity > 1 and not candidate["allows_multiple_copies"]:
|
|
raise UserError(
|
|
_("OpenAI attempted to add multiple copies of a singleton card.")
|
|
)
|
|
total_quantity += quantity
|
|
if total_quantity > max_cards:
|
|
raise UserError(_("OpenAI attempted to add too many cards in one batch."))
|
|
selected_cards.append(
|
|
{
|
|
"card_id": card_id,
|
|
"quantity": quantity,
|
|
"name": candidate["name"],
|
|
"mana_cost": candidate["mana_cost"],
|
|
"type_line": candidate["type_line"],
|
|
"reason": (item.get("reason") or "").strip() or False,
|
|
}
|
|
)
|
|
return {
|
|
"summary": (fill_payload.get("summary") or "").strip() or False,
|
|
"cards": selected_cards,
|
|
}
|
|
|
|
def _mtg_openai_prepare_role_updates(self, batch_payload, role_payload, role_map):
|
|
"""Validate one role-analysis batch and convert it into line writes.
|
|
|
|
Args:
|
|
batch_payload: Input card payloads for one batch.
|
|
role_payload: Parsed JSON payload returned by OpenAI.
|
|
role_map: Supported role records keyed by stable AI keys.
|
|
|
|
Returns:
|
|
list[dict[str, object]]: Line updates ready for ORM writes.
|
|
"""
|
|
expected_ids = {item["line_id"] for item in batch_payload}
|
|
cards = role_payload.get("cards") or []
|
|
seen_ids = set()
|
|
updates = []
|
|
for item in cards:
|
|
line_id = item.get("line_id")
|
|
if line_id not in expected_ids:
|
|
raise UserError(
|
|
_("OpenAI returned a role analysis for an unknown deck line.")
|
|
)
|
|
if line_id in seen_ids:
|
|
raise UserError(
|
|
_("OpenAI returned duplicate role analysis rows for one deck line.")
|
|
)
|
|
role_ids = []
|
|
for role_key in item.get("role_keys") or []:
|
|
role_record = role_map.get(role_key)
|
|
if not role_record:
|
|
raise UserError(
|
|
_(
|
|
"OpenAI returned an unsupported role key: %(role_key)s",
|
|
role_key=role_key,
|
|
)
|
|
)
|
|
role_ids.append(role_record.id)
|
|
seen_ids.add(line_id)
|
|
updates.append(
|
|
{
|
|
"line_id": line_id,
|
|
"role_ids": role_ids,
|
|
"rationale": (item.get("rationale") or "").strip() or False,
|
|
}
|
|
)
|
|
if seen_ids != expected_ids:
|
|
raise UserError(
|
|
_("OpenAI did not return role assignments for every deck line.")
|
|
)
|
|
return updates
|
|
|
|
def _mtg_openai_request_analysis(self, api_key, model_name, prompt):
|
|
"""Call the OpenAI Responses API and parse the JSON analysis payload.
|
|
|
|
Args:
|
|
api_key: OpenAI API key.
|
|
model_name: OpenAI model identifier.
|
|
prompt: Final analysis prompt.
|
|
|
|
Returns:
|
|
tuple[dict[str, object], str]: Parsed analysis JSON and raw response.
|
|
"""
|
|
schema = {
|
|
"type": "object",
|
|
"additionalProperties": False,
|
|
"properties": {
|
|
"commander_summary": {"type": "string"},
|
|
"gameplan_bullets": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
},
|
|
"pilot_tips": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
},
|
|
"risk_notes": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
},
|
|
},
|
|
"required": [
|
|
"commander_summary",
|
|
"gameplan_bullets",
|
|
"pilot_tips",
|
|
"risk_notes",
|
|
],
|
|
}
|
|
return self._mtg_openai_request_structured_payload(
|
|
api_key=api_key,
|
|
model_name=model_name,
|
|
prompt=prompt,
|
|
system_instruction=(
|
|
"You are an expert Magic: The Gathering Commander analyst. "
|
|
"You explain how decks function, how they sequence, and where "
|
|
"their strengths and risks lie."
|
|
),
|
|
response_name="mvd_tcg_mtg_commander_analysis",
|
|
schema=schema,
|
|
max_output_tokens=3200,
|
|
)
|
|
|
|
def _mtg_openai_request_role_analysis(self, api_key, model_name, prompt):
|
|
"""Call the OpenAI Responses API for card-role classification.
|
|
|
|
Args:
|
|
api_key: OpenAI API key.
|
|
model_name: OpenAI model identifier.
|
|
prompt: Final role-analysis prompt.
|
|
|
|
Returns:
|
|
tuple[dict[str, object], str]: Parsed role JSON and raw response.
|
|
"""
|
|
schema = {
|
|
"type": "object",
|
|
"additionalProperties": False,
|
|
"properties": {
|
|
"cards": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"additionalProperties": False,
|
|
"properties": {
|
|
"line_id": {"type": "integer"},
|
|
"role_keys": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "string",
|
|
"enum": [
|
|
"ramp",
|
|
"draw",
|
|
"removal",
|
|
"interaction",
|
|
"protection",
|
|
"wincon",
|
|
"value",
|
|
"combo",
|
|
],
|
|
},
|
|
},
|
|
"rationale": {"type": "string"},
|
|
},
|
|
"required": ["line_id", "role_keys", "rationale"],
|
|
},
|
|
}
|
|
},
|
|
"required": ["cards"],
|
|
}
|
|
return self._mtg_openai_request_structured_payload(
|
|
api_key=api_key,
|
|
model_name=model_name,
|
|
prompt=prompt,
|
|
system_instruction=(
|
|
"You are an expert Magic: The Gathering Commander deckbuilding analyst. "
|
|
"You classify cards into practical deckbuilding roles such as ramp, draw, "
|
|
"interaction, protection, win condition, value, and combo."
|
|
),
|
|
response_name="mvd_tcg_mtg_card_role_analysis",
|
|
schema=schema,
|
|
max_output_tokens=6400,
|
|
)
|
|
|
|
def _mtg_openai_request_alternative_analysis(self, api_key, model_name, prompt):
|
|
"""Call the Responses API for issue-line replacement suggestions."""
|
|
schema = {
|
|
"type": "object",
|
|
"additionalProperties": False,
|
|
"properties": {
|
|
"alternatives": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"additionalProperties": False,
|
|
"properties": {
|
|
"line_id": {"type": "integer"},
|
|
"summary": {"type": "string"},
|
|
"suggestions": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"additionalProperties": False,
|
|
"properties": {
|
|
"card_id": {"type": "integer"},
|
|
"reason": {"type": "string"},
|
|
},
|
|
"required": ["card_id", "reason"],
|
|
},
|
|
},
|
|
},
|
|
"required": ["line_id", "summary", "suggestions"],
|
|
},
|
|
}
|
|
},
|
|
"required": ["alternatives"],
|
|
}
|
|
return self._mtg_openai_request_structured_payload(
|
|
api_key=api_key,
|
|
model_name=model_name,
|
|
prompt=prompt,
|
|
system_instruction=(
|
|
"You are an expert Magic: The Gathering Commander deck builder. "
|
|
"You repair illegal or awkward deck lines by proposing replacement cards "
|
|
"that preserve deck function, mana curve, and commander synergy."
|
|
),
|
|
response_name="mvd_tcg_mtg_alternative_suggestions",
|
|
schema=schema,
|
|
max_output_tokens=7200,
|
|
)
|
|
|
|
def _mtg_openai_request_fill_selection(self, api_key, model_name, prompt):
|
|
"""Call the Responses API for a batch of deck-fill card selections."""
|
|
schema = {
|
|
"type": "object",
|
|
"additionalProperties": False,
|
|
"properties": {
|
|
"summary": {"type": "string"},
|
|
"cards": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"additionalProperties": False,
|
|
"properties": {
|
|
"card_id": {"type": "integer"},
|
|
"quantity": {"type": "integer"},
|
|
"reason": {"type": "string"},
|
|
},
|
|
"required": ["card_id", "quantity", "reason"],
|
|
},
|
|
},
|
|
},
|
|
"required": ["summary", "cards"],
|
|
}
|
|
return self._mtg_openai_request_structured_payload(
|
|
api_key=api_key,
|
|
model_name=model_name,
|
|
prompt=prompt,
|
|
system_instruction=(
|
|
"You are an expert Magic: The Gathering Commander deck builder. "
|
|
"You complete partial decks from a bounded in-system card pool while keeping "
|
|
"mana, interaction, draw, threat density, and commander synergy in balance."
|
|
),
|
|
response_name="mvd_tcg_mtg_fill_selection",
|
|
schema=schema,
|
|
max_output_tokens=9600,
|
|
)
|
|
|
|
def _mtg_openai_request_structured_payload(
|
|
self,
|
|
api_key,
|
|
model_name,
|
|
prompt,
|
|
system_instruction,
|
|
response_name,
|
|
schema,
|
|
max_output_tokens,
|
|
):
|
|
"""Call the Responses API for one structured JSON response.
|
|
|
|
Args:
|
|
api_key: OpenAI API key.
|
|
model_name: OpenAI model identifier.
|
|
prompt: Final user prompt.
|
|
system_instruction: System message content.
|
|
response_name: Stable JSON schema name.
|
|
schema: JSON schema definition.
|
|
max_output_tokens: Output token limit for the response.
|
|
|
|
Returns:
|
|
tuple[dict[str, object], str]: Parsed JSON payload and raw response.
|
|
"""
|
|
request_payload = {
|
|
"model": model_name,
|
|
"reasoning": {"effort": "low"},
|
|
"input": [
|
|
{
|
|
"role": "system",
|
|
"content": [
|
|
{
|
|
"type": "input_text",
|
|
"text": system_instruction,
|
|
}
|
|
],
|
|
},
|
|
{
|
|
"role": "user",
|
|
"content": [{"type": "input_text", "text": prompt}],
|
|
},
|
|
],
|
|
"text": {
|
|
"verbosity": "low",
|
|
"format": {
|
|
"type": "json_schema",
|
|
"name": response_name,
|
|
"schema": schema,
|
|
"strict": True,
|
|
}
|
|
},
|
|
"max_output_tokens": max_output_tokens,
|
|
}
|
|
http_request = request.Request(
|
|
f"{self._mtg_openai_get_api_base_url()}/responses",
|
|
data=json.dumps(request_payload).encode("utf-8"),
|
|
headers={
|
|
"Authorization": f"Bearer {api_key}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
method="POST",
|
|
)
|
|
response_body = False
|
|
for attempt in range(1, 4):
|
|
try:
|
|
with request.urlopen(
|
|
http_request,
|
|
timeout=self._mtg_openai_get_request_timeout_seconds(),
|
|
) as response:
|
|
response_body = response.read().decode("utf-8")
|
|
break
|
|
except error.HTTPError as exc:
|
|
error_body = exc.read().decode("utf-8", errors="replace")
|
|
if exc.code in {429, 500, 502, 503, 504} and attempt < 3:
|
|
time.sleep(attempt * 2)
|
|
continue
|
|
raise UserError(
|
|
f"OpenAI request failed with status {exc.code}: {error_body}"
|
|
) from exc
|
|
except error.URLError as exc:
|
|
if attempt < 3:
|
|
time.sleep(attempt * 2)
|
|
continue
|
|
raise UserError(f"OpenAI request failed: {exc.reason}") from exc
|
|
|
|
response_payload = json.loads(response_body)
|
|
if response_payload.get("status") == "incomplete":
|
|
incomplete_reason = (
|
|
response_payload.get("incomplete_details", {}).get("reason")
|
|
or "unknown"
|
|
)
|
|
if incomplete_reason == "max_output_tokens":
|
|
raise UserError(
|
|
"OpenAI deck analysis hit the output token limit before it returned the final JSON."
|
|
)
|
|
raise UserError(
|
|
f"OpenAI deck analysis did not complete successfully: {incomplete_reason}."
|
|
)
|
|
text_payload = cls._mtg_openai_extract_output_text(response_payload)
|
|
try:
|
|
analysis_payload = json.loads(text_payload)
|
|
except json.JSONDecodeError as exc:
|
|
raise UserError("OpenAI returned a non-JSON analysis payload.") from exc
|
|
return analysis_payload, response_body
|
|
|
|
@staticmethod
|
|
def _mtg_openai_extract_output_text(response_payload):
|
|
"""Extract the text output from one Responses API payload.
|
|
|
|
Args:
|
|
response_payload: Decoded JSON payload returned by Responses.
|
|
|
|
Returns:
|
|
str: Concatenated text output.
|
|
"""
|
|
output_text = response_payload.get("output_text")
|
|
if output_text:
|
|
return output_text
|
|
fragments = []
|
|
for output_item in response_payload.get("output", []):
|
|
if output_item.get("type") != "message":
|
|
continue
|
|
for content_item in output_item.get("content", []):
|
|
text_value = content_item.get("text")
|
|
if text_value:
|
|
fragments.append(text_value)
|
|
if fragments:
|
|
return "\n".join(fragments)
|
|
raise UserError("OpenAI returned no text output for the deck analysis.")
|
|
|
|
@staticmethod
|
|
def _mtg_openai_render_paragraphs(text_value):
|
|
"""Render plain text paragraphs into safe Odoo HTML.
|
|
|
|
Args:
|
|
text_value: Plain text paragraph content.
|
|
|
|
Returns:
|
|
str | bool: Safe HTML markup or ``False`` if empty.
|
|
"""
|
|
clean_value = (text_value or "").strip()
|
|
if not clean_value:
|
|
return False
|
|
paragraphs = [
|
|
f"<p>{html.escape(paragraph.strip())}</p>"
|
|
for paragraph in clean_value.split("\n")
|
|
if paragraph.strip()
|
|
]
|
|
return "".join(paragraphs) or False
|
|
|
|
@staticmethod
|
|
def _mtg_openai_render_bullets(items):
|
|
"""Render a list of plain-text bullets into safe Odoo HTML.
|
|
|
|
Args:
|
|
items: Bullet strings.
|
|
|
|
Returns:
|
|
str | bool: Safe HTML markup or ``False`` if empty.
|
|
"""
|
|
cleaned_items = [
|
|
html.escape((item or "").strip())
|
|
for item in (items or [])
|
|
if (item or "").strip()
|
|
]
|
|
if not cleaned_items:
|
|
return False
|
|
bullet_markup = "".join(f"<li>{item}</li>" for item in cleaned_items)
|
|
return f"<ul>{bullet_markup}</ul>"
|
|
|
|
@staticmethod
|
|
def _mtg_openai_render_alternative_suggestions(prepared_suggestions):
|
|
"""Render alternative suggestions as compact deck-analysis HTML."""
|
|
sections = []
|
|
for item in prepared_suggestions:
|
|
issue_markup = ", ".join(html.escape(issue) for issue in item["issues"])
|
|
summary_markup = (
|
|
f"<p>{html.escape(item['summary'])}</p>" if item["summary"] else ""
|
|
)
|
|
if item["suggestions"]:
|
|
suggestion_markup = ""
|
|
for suggestion in item["suggestions"]:
|
|
mana_markup = (
|
|
f" {html.escape(suggestion['mana_cost'])}"
|
|
if suggestion["mana_cost"]
|
|
else ""
|
|
)
|
|
type_markup = (
|
|
f" <span class='text-muted'>{html.escape(suggestion['type_line'])}</span>"
|
|
if suggestion["type_line"]
|
|
else ""
|
|
)
|
|
reason_markup = (
|
|
f" <span class='text-muted'>- {html.escape(suggestion['reason'])}</span>"
|
|
if suggestion["reason"]
|
|
else ""
|
|
)
|
|
suggestion_markup += (
|
|
"<li>"
|
|
f"<strong>{html.escape(suggestion['name'])}</strong>"
|
|
f"{mana_markup}{type_markup}{reason_markup}"
|
|
"</li>"
|
|
)
|
|
else:
|
|
suggestion_markup = "<li>No strong in-system replacement was found.</li>"
|
|
sections.append(
|
|
(
|
|
"<div class=\"o_mvd_tcg_openai_result_block\">"
|
|
f"<h4>{html.escape(item['card_name'])}</h4>"
|
|
f"<p><strong>Issue</strong>: {issue_markup}</p>"
|
|
f"{summary_markup}"
|
|
f"<ul>{suggestion_markup}</ul>"
|
|
"</div>"
|
|
)
|
|
)
|
|
return "".join(sections) or False
|
|
|
|
@staticmethod
|
|
def _mtg_openai_render_fill_summary(fill_batches, total_added, remaining_slots):
|
|
"""Render fill batches as compact deck-analysis HTML."""
|
|
intro = (
|
|
f"<p>Added <strong>{total_added}</strong> card(s). "
|
|
f"Remaining mainboard slots: <strong>{remaining_slots}</strong>.</p>"
|
|
)
|
|
batch_markup = []
|
|
for index, batch in enumerate(fill_batches, start=1):
|
|
summary_markup = (
|
|
f"<p>{html.escape(batch['summary'])}</p>" if batch["summary"] else ""
|
|
)
|
|
card_markup = ""
|
|
for suggestion in batch["cards"]:
|
|
mana_markup = (
|
|
f" {html.escape(suggestion['mana_cost'])}"
|
|
if suggestion["mana_cost"]
|
|
else ""
|
|
)
|
|
type_markup = (
|
|
f" <span class='text-muted'>{html.escape(suggestion['type_line'])}</span>"
|
|
if suggestion["type_line"]
|
|
else ""
|
|
)
|
|
reason_markup = (
|
|
f" <span class='text-muted'>- {html.escape(suggestion['reason'])}</span>"
|
|
if suggestion["reason"]
|
|
else ""
|
|
)
|
|
card_markup += (
|
|
"<li>"
|
|
f"<strong>{suggestion['quantity']}x {html.escape(suggestion['name'])}</strong>"
|
|
f"{mana_markup}{type_markup}{reason_markup}"
|
|
"</li>"
|
|
)
|
|
batch_markup.append(
|
|
(
|
|
"<div class=\"o_mvd_tcg_openai_result_block\">"
|
|
f"<h4>Fill Batch {index}</h4>"
|
|
f"{summary_markup}"
|
|
f"<ul>{card_markup}</ul>"
|
|
"</div>"
|
|
)
|
|
)
|
|
return intro + "".join(batch_markup)
|