"""OpenAI-powered MTG deck analysis.""" import html import json import os import time from urllib import error, parse, request from odoo import _, fields, models from odoo.exceptions import UserError from .constants import ( DEFAULT_MTG_OPENAI_API_BASE_URL, DEFAULT_MTG_OPENAI_ALTERNATIVE_CANDIDATE_LIMIT, DEFAULT_MTG_OPENAI_FILL_BATCH_SIZE, DEFAULT_MTG_OPENAI_FILL_CANDIDATE_LIMIT, DEFAULT_MTG_OPENAI_MODEL_NAME, DEFAULT_MTG_OPENAI_REQUEST_TIMEOUT_SECONDS, DEFAULT_MTG_OPENAI_ROLE_BATCH_SIZE, ) class MvdTcgDeck(models.Model): """Extend MTG decks with on-demand OpenAI analysis fields.""" _inherit = "mvd.tcg.deck" _MTG_OPENAI_ALLOWED_BASE_HOSTS = frozenset({"api.openai.com"}) _MTG_OPENAI_BATCH_SEPARATOR = "\n\n--- batch ---\n\n" _MTG_OPENAI_STATE_SELECTION = [ ("empty", "Not Analyzed"), ("done", "Ready"), ("failed", "Failed"), ] _MTG_OPENAI_ROLE_BATCH_SIZE = DEFAULT_MTG_OPENAI_ROLE_BATCH_SIZE _MTG_OPENAI_FILL_CANDIDATE_LIMIT = DEFAULT_MTG_OPENAI_FILL_CANDIDATE_LIMIT _MTG_OPENAI_FILL_BATCH_SIZE = DEFAULT_MTG_OPENAI_FILL_BATCH_SIZE _MTG_OPENAI_ALTERNATIVE_CANDIDATE_LIMIT = DEFAULT_MTG_OPENAI_ALTERNATIVE_CANDIDATE_LIMIT _MTG_OPENAI_RUN_FIELD_MAP = { "analysis": { "state": "mtg_openai_analysis_state", "error": "mtg_openai_last_error", "analyzed_at": "mtg_openai_last_analyzed_at", "model": "mtg_openai_last_model", "lang": "mtg_openai_last_lang", "prompt": "mtg_openai_last_prompt", "response": "mtg_openai_last_response", }, "role": { "state": "mtg_openai_role_analysis_state", "error": "mtg_openai_role_last_error", "analyzed_at": "mtg_openai_role_last_analyzed_at", "model": "mtg_openai_role_last_model", "lang": "mtg_openai_role_last_lang", "prompt": "mtg_openai_role_last_prompt", "response": "mtg_openai_role_last_response", }, "alternative": { "state": "mtg_openai_alternative_state", "error": "mtg_openai_alternative_last_error", "analyzed_at": "mtg_openai_alternative_last_analyzed_at", "model": "mtg_openai_alternative_last_model", "lang": "mtg_openai_alternative_last_lang", "prompt": "mtg_openai_alternative_last_prompt", "response": "mtg_openai_alternative_last_response", }, "fill": { "state": "mtg_openai_fill_state", "error": "mtg_openai_fill_last_error", "analyzed_at": "mtg_openai_fill_last_analyzed_at", "model": "mtg_openai_fill_last_model", "lang": "mtg_openai_fill_last_lang", "prompt": "mtg_openai_fill_last_prompt", "response": "mtg_openai_fill_last_response", }, } mtg_openai_analysis_state = fields.Selection( selection=_MTG_OPENAI_STATE_SELECTION, string="Deck Analysis Status", default="empty", readonly=True, copy=False, ) mtg_openai_last_analyzed_at = fields.Datetime( string="Last Deck Analysis", readonly=True, copy=False, ) mtg_openai_last_model = fields.Char( string="Analysis Model", readonly=True, copy=False, groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system", ) mtg_openai_last_lang = fields.Char( string="Analysis Language", readonly=True, copy=False, groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system", ) mtg_openai_commander_summary = fields.Html( string="Commander Summary", readonly=True, copy=False, ) mtg_openai_gameplan = fields.Html( string="Game Plan", readonly=True, copy=False, ) mtg_openai_pilot_tips = fields.Html( string="Pilot Tips", readonly=True, copy=False, ) mtg_openai_risk_notes = fields.Html( string="Risk Notes", readonly=True, copy=False, ) mtg_openai_last_error = fields.Text( string="Last Analysis Error", readonly=True, copy=False, ) mtg_openai_last_prompt = fields.Text( string="Last Analysis Prompt", readonly=True, copy=False, groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system", ) mtg_openai_last_response = fields.Text( string="Last Analysis Response", readonly=True, copy=False, groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system", ) mtg_openai_role_analysis_state = fields.Selection( selection=_MTG_OPENAI_STATE_SELECTION, string="Role Analysis Status", default="empty", readonly=True, copy=False, ) mtg_openai_role_last_analyzed_at = fields.Datetime( string="Last Role Analysis", readonly=True, copy=False, ) mtg_openai_role_last_model = fields.Char( string="Role Analysis Model", readonly=True, copy=False, groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system", ) mtg_openai_role_last_lang = fields.Char( string="Role Analysis Language", readonly=True, copy=False, groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system", ) mtg_openai_role_last_error = fields.Text( string="Last Role Analysis Error", readonly=True, copy=False, ) mtg_openai_role_last_prompt = fields.Text( string="Last Role Analysis Prompt", readonly=True, copy=False, groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system", ) mtg_openai_role_last_response = fields.Text( string="Last Role Analysis Response", readonly=True, copy=False, groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system", ) mtg_openai_alternative_state = fields.Selection( selection=_MTG_OPENAI_STATE_SELECTION, string="Alternative Analysis Status", default="empty", readonly=True, copy=False, ) mtg_openai_alternative_last_analyzed_at = fields.Datetime( string="Last Alternative Analysis", readonly=True, copy=False, ) mtg_openai_alternative_last_model = fields.Char( string="Alternative Analysis Model", readonly=True, copy=False, groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system", ) mtg_openai_alternative_last_lang = fields.Char( string="Alternative Analysis Language", readonly=True, copy=False, groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system", ) mtg_openai_alternative_suggestions = fields.Html( string="Alternative Suggestions", readonly=True, copy=False, ) mtg_openai_alternative_last_error = fields.Text( string="Last Alternative Analysis Error", readonly=True, copy=False, ) mtg_openai_alternative_last_prompt = fields.Text( string="Last Alternative Analysis Prompt", readonly=True, copy=False, groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system", ) mtg_openai_alternative_last_response = fields.Text( string="Last Alternative Analysis Response", readonly=True, copy=False, groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system", ) mtg_openai_fill_state = fields.Selection( selection=_MTG_OPENAI_STATE_SELECTION, string="Deck Fill Status", default="empty", readonly=True, copy=False, ) mtg_openai_fill_last_analyzed_at = fields.Datetime( string="Last Deck Fill", readonly=True, copy=False, ) mtg_openai_fill_last_model = fields.Char( string="Deck Fill Model", readonly=True, copy=False, groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system", ) mtg_openai_fill_last_lang = fields.Char( string="Deck Fill Language", readonly=True, copy=False, groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system", ) mtg_openai_fill_summary = fields.Html( string="Deck Fill Summary", readonly=True, copy=False, ) mtg_openai_fill_last_error = fields.Text( string="Last Deck Fill Error", readonly=True, copy=False, ) mtg_openai_fill_last_prompt = fields.Text( string="Last Deck Fill Prompt", readonly=True, copy=False, groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system", ) mtg_openai_fill_last_response = fields.Text( string="Last Deck Fill Response", readonly=True, copy=False, groups="mvd_tcg_base.mvd_tcg_base_group_administrator,base.group_system", ) def _mtg_openai_check_manager_access(self): """Require manager-level rights for OpenAI-triggering actions. Returns: bool: ``True`` when the current user may trigger OpenAI flows. """ if self.env.is_superuser() or any( self.env.user.has_group(xmlid) for xmlid in ( "mvd_tcg_base.mvd_tcg_base_group_manager", "base.group_system", ) ): return True raise UserError( _("Only TCG managers can run OpenAI deck analysis actions.") ) @classmethod def _mtg_openai_validate_base_url(cls, url): """Validate the configured OpenAI base URL. Args: url: Absolute OpenAI API base URL. Returns: str: Validated absolute base URL without a trailing slash. Raises: UserError: If the URL does not target the official HTTPS API host. """ normalized_url = (url or "").strip().rstrip("/") parsed_url = parse.urlparse(normalized_url) hostname = (parsed_url.hostname or "").lower() if parsed_url.scheme != "https" or hostname not in cls._MTG_OPENAI_ALLOWED_BASE_HOSTS: raise UserError( _("OpenAI API Base URL must use HTTPS and the official api.openai.com host.") ) return normalized_url def action_mtg_openai_analyze_deck(self): """Analyze one MTG deck with OpenAI and persist the result. The analysis language follows the current user language, while the Oracle text payload remains pinned to ``en_US`` for rules accuracy. """ self.ensure_one() self._mtg_openai_check_manager_access() if not self.is_mtg_deck: raise UserError(_("OpenAI deck analysis is only available for Magic decks.")) api_key = self._mtg_openai_get_api_key() model_name = self._mtg_openai_get_model_name() analysis_lang = self._mtg_openai_get_analysis_lang() prompt = self._mtg_openai_build_prompt(analysis_lang) try: analysis_payload, raw_response = self._mtg_openai_request_analysis( api_key=api_key, model_name=model_name, prompt=prompt, ) except UserError as exc: self.write( { "mtg_openai_analysis_state": "failed", "mtg_openai_last_error": str(exc), "mtg_openai_last_analyzed_at": fields.Datetime.now(), "mtg_openai_last_model": model_name, "mtg_openai_last_lang": analysis_lang, "mtg_openai_last_prompt": prompt, } ) raise self.write( { "mtg_openai_analysis_state": "done", "mtg_openai_last_error": False, "mtg_openai_last_analyzed_at": fields.Datetime.now(), "mtg_openai_last_model": model_name, "mtg_openai_last_lang": analysis_lang, "mtg_openai_last_prompt": prompt, "mtg_openai_last_response": raw_response, "mtg_openai_commander_summary": self._mtg_openai_render_paragraphs( analysis_payload.get("commander_summary") ), "mtg_openai_gameplan": self._mtg_openai_render_bullets( analysis_payload.get("gameplan_bullets") ), "mtg_openai_pilot_tips": self._mtg_openai_render_bullets( analysis_payload.get("pilot_tips") ), "mtg_openai_risk_notes": self._mtg_openai_render_bullets( analysis_payload.get("risk_notes") ), } ) return { "type": "ir.actions.act_window", "res_model": "mvd.tcg.deck", "res_id": self.id, "view_mode": "form", "target": "current", } def action_mtg_openai_analyze_card_roles(self): """Analyze role tags for the cards of one MTG deck. The analysis language follows the active user language, while ``oracle_text`` remains pinned to ``en_US`` for rules accuracy. Returns: dict: Window action that reloads the current deck form. """ self.ensure_one() self._mtg_openai_check_manager_access() if not self.is_mtg_deck: raise UserError(_("OpenAI card analysis is only available for Magic decks.")) api_key = self._mtg_openai_get_api_key() model_name = self._mtg_openai_get_model_name() analysis_lang = self._mtg_openai_get_analysis_lang() role_map = self._mtg_openai_get_role_key_map() card_payload = self._mtg_openai_collect_role_payload() if not card_payload: raise UserError(_("This deck has no cards to analyze.")) batch_size = self._mtg_openai_get_role_batch_size() prompt_batches = [] response_batches = [] pending_updates = [] try: for batch_start in range(0, len(card_payload), batch_size): batch_payload = card_payload[batch_start:batch_start + batch_size] prompt = self._mtg_openai_build_role_prompt( analysis_lang=analysis_lang, role_map=role_map, batch_payload=batch_payload, ) prompt_batches.append(prompt) role_payload, raw_response = self._mtg_openai_request_role_analysis( api_key=api_key, model_name=model_name, prompt=prompt, ) response_batches.append(raw_response) pending_updates.extend( self._mtg_openai_prepare_role_updates( batch_payload=batch_payload, role_payload=role_payload, role_map=role_map, ) ) except UserError as exc: self.write( { "mtg_openai_role_analysis_state": "failed", "mtg_openai_role_last_error": str(exc), "mtg_openai_role_last_analyzed_at": fields.Datetime.now(), "mtg_openai_role_last_model": model_name, "mtg_openai_role_last_lang": analysis_lang, "mtg_openai_role_last_prompt": "\n\n--- batch ---\n\n".join( prompt_batches ), "mtg_openai_role_last_response": "\n\n--- batch ---\n\n".join( response_batches ) or False, } ) raise for values in pending_updates: line = self.env["mvd.tcg.deck.line"].browse(values["line_id"]) line.write( { "role_ids": [(6, 0, values["role_ids"])], "mtg_openai_role_rationale": values["rationale"], } ) self.write( { "mtg_openai_role_analysis_state": "done", "mtg_openai_role_last_error": False, "mtg_openai_role_last_analyzed_at": fields.Datetime.now(), "mtg_openai_role_last_model": model_name, "mtg_openai_role_last_lang": analysis_lang, "mtg_openai_role_last_prompt": "\n\n--- batch ---\n\n".join( prompt_batches ), "mtg_openai_role_last_response": "\n\n--- batch ---\n\n".join( response_batches ), } ) return { "type": "ir.actions.act_window", "res_model": "mvd.tcg.deck", "res_id": self.id, "view_mode": "form", "target": "current", } def action_mtg_openai_suggest_alternatives(self): """Suggest replacement cards for current MTG issue lines.""" self.ensure_one() self._mtg_openai_check_manager_access() if not self.is_mtg_deck: raise UserError( _("OpenAI alternative suggestions are only available for Magic decks.") ) issue_payload = self._mtg_openai_collect_alternative_payload() if not issue_payload: raise UserError(_("This deck has no replaceable issue lines right now.")) api_key = self._mtg_openai_get_api_key() model_name = self._mtg_openai_get_model_name() analysis_lang = self._mtg_openai_get_analysis_lang() prompt = self._mtg_openai_build_alternative_prompt( analysis_lang=analysis_lang, issue_payload=issue_payload, ) try: alternative_payload, raw_response = self._mtg_openai_request_alternative_analysis( api_key=api_key, model_name=model_name, prompt=prompt, ) prepared_suggestions = self._mtg_openai_prepare_alternative_suggestions( issue_payload=issue_payload, alternative_payload=alternative_payload, ) except UserError as exc: self.write( { "mtg_openai_alternative_state": "failed", "mtg_openai_alternative_last_error": str(exc), "mtg_openai_alternative_last_analyzed_at": fields.Datetime.now(), "mtg_openai_alternative_last_model": model_name, "mtg_openai_alternative_last_lang": analysis_lang, "mtg_openai_alternative_last_prompt": prompt, } ) raise self.write( { "mtg_openai_alternative_state": "done", "mtg_openai_alternative_last_error": False, "mtg_openai_alternative_last_analyzed_at": fields.Datetime.now(), "mtg_openai_alternative_last_model": model_name, "mtg_openai_alternative_last_lang": analysis_lang, "mtg_openai_alternative_last_prompt": prompt, "mtg_openai_alternative_last_response": raw_response, "mtg_openai_alternative_suggestions": self._mtg_openai_render_alternative_suggestions( prepared_suggestions ), } ) return { "type": "ir.actions.act_window", "res_model": "mvd.tcg.deck", "res_id": self.id, "view_mode": "form", "target": "current", } def action_mtg_openai_fill_deck(self): """Fill missing mainboard slots from the in-system MTG card pool.""" self.ensure_one() self._mtg_openai_check_manager_access() if not self.is_mtg_deck: raise UserError(_("OpenAI deck fill is only available for Magic decks.")) if not self.mtg_commander_card_id: raise UserError(_("Select at least one commander card first.")) if not self.mtg_mainboard_board_id: raise UserError(_("This deck does not have a mainboard yet.")) if not self.mtg_expected_mainboard_size: raise UserError(_("Select a supported MTG format before using deck fill.")) remaining_slots = self.mtg_expected_mainboard_size - self.mtg_mainboard_count if remaining_slots <= 0: raise UserError(_("The mainboard is already full for the selected format.")) api_key = self._mtg_openai_get_api_key() model_name = self._mtg_openai_get_model_name() analysis_lang = self._mtg_openai_get_analysis_lang() rendered_batches = [] prompt_batches = [] response_batches = [] total_added = 0 try: while remaining_slots > 0: candidate_payload = self._mtg_openai_collect_fill_candidate_payload( limit=self._mtg_openai_get_fill_candidate_limit() ) if not candidate_payload: break batch_size = min( remaining_slots, self._mtg_openai_get_fill_batch_size(), len(candidate_payload), ) prompt = self._mtg_openai_build_fill_prompt( analysis_lang=analysis_lang, candidate_payload=candidate_payload, batch_size=batch_size, ) prompt_batches.append(prompt) fill_payload, raw_response = self._mtg_openai_request_fill_selection( api_key=api_key, model_name=model_name, prompt=prompt, ) response_batches.append(raw_response) prepared_fill = self._mtg_openai_prepare_fill_selection( candidate_payload=candidate_payload, fill_payload=fill_payload, max_cards=batch_size, ) if not prepared_fill["cards"]: break rendered_batches.append(prepared_fill) for suggestion in prepared_fill["cards"]: card = self.env["mvd.tcg.card"].browse(suggestion["card_id"]).exists() if not card: continue self._mtg_openai_add_card_to_board( card=card, board=self.mtg_mainboard_board_id, quantity=suggestion["quantity"], ) total_added += suggestion["quantity"] self.invalidate_recordset( [ "mtg_mainboard_count", "mtg_rule_warning_count", "mtg_duplicate_card_count", ] ) remaining_slots = self.mtg_expected_mainboard_size - self.mtg_mainboard_count except UserError as exc: self.write( { "mtg_openai_fill_state": "failed", "mtg_openai_fill_last_error": str(exc), "mtg_openai_fill_last_analyzed_at": fields.Datetime.now(), "mtg_openai_fill_last_model": model_name, "mtg_openai_fill_last_lang": analysis_lang, "mtg_openai_fill_last_prompt": "\n\n--- batch ---\n\n".join( prompt_batches ), "mtg_openai_fill_last_response": "\n\n--- batch ---\n\n".join( response_batches ) or False, } ) raise summary_markup = self._mtg_openai_render_fill_summary( fill_batches=rendered_batches, total_added=total_added, remaining_slots=max( 0, self.mtg_expected_mainboard_size - self.mtg_mainboard_count, ), ) self.write( { "mtg_openai_fill_state": "done", "mtg_openai_fill_last_error": False, "mtg_openai_fill_last_analyzed_at": fields.Datetime.now(), "mtg_openai_fill_last_model": model_name, "mtg_openai_fill_last_lang": analysis_lang, "mtg_openai_fill_last_prompt": "\n\n--- batch ---\n\n".join( prompt_batches ), "mtg_openai_fill_last_response": "\n\n--- batch ---\n\n".join( response_batches ) or False, "mtg_openai_fill_summary": summary_markup, } ) return { "type": "ir.actions.act_window", "res_model": "mvd.tcg.deck", "res_id": self.id, "view_mode": "form", "target": "current", } def _mtg_openai_get_api_key(self): """Return the configured OpenAI API key. Returns: str: The configured OpenAI API key. """ config_key = "mvd_tcg_mtg_deck_openai.api_key" api_key = (self._mtg_openai_get_config_parameter(config_key) or "").strip() if not api_key: api_key = (os.getenv("OPENAI_API_KEY") or "").strip() if not api_key: raise UserError(_("Configure an OpenAI API key in TCG Settings first.")) return api_key def _mtg_openai_get_config_parameter(self, name, default=False): """Return one OpenAI configuration parameter. Args: name: Technical config parameter key. default: Fallback value when the parameter is unset. Returns: str | bool: Stored config value or the provided fallback. """ return self.env["ir.config_parameter"].sudo().get_param(name, default) def _mtg_openai_get_positive_integer_setting(self, name, default): """Return one positive integer connector setting. Args: name: Technical config parameter key. default: Fallback integer when the parameter is unset or invalid. Returns: int: Positive integer configuration value. """ raw_value = self._mtg_openai_get_config_parameter(name, default) try: return max(1, int(raw_value)) except (TypeError, ValueError): return default def _mtg_openai_get_model_name(self): """Return the configured OpenAI model name. Returns: str: The OpenAI model identifier. """ config_key = "mvd_tcg_mtg_deck_openai.model_name" model_name = ( self._mtg_openai_get_config_parameter( config_key, DEFAULT_MTG_OPENAI_MODEL_NAME, ) or DEFAULT_MTG_OPENAI_MODEL_NAME ).strip() if model_name == DEFAULT_MTG_OPENAI_MODEL_NAME: env_model_name = (os.getenv("OPENAI_MODEL") or "").strip() if env_model_name: return env_model_name return model_name def _mtg_openai_get_api_base_url(self): """Return the configured OpenAI API base URL. Returns: str: Base URL for Responses API requests. """ return self._mtg_openai_validate_base_url( ( self._mtg_openai_get_config_parameter( "mvd_tcg_mtg_deck_openai.api_base_url", DEFAULT_MTG_OPENAI_API_BASE_URL, ) or DEFAULT_MTG_OPENAI_API_BASE_URL ) ) def _mtg_openai_get_request_timeout_seconds(self): """Return the configured OpenAI request timeout.""" return self._mtg_openai_get_positive_integer_setting( "mvd_tcg_mtg_deck_openai.request_timeout_seconds", DEFAULT_MTG_OPENAI_REQUEST_TIMEOUT_SECONDS, ) def _mtg_openai_get_role_batch_size(self): """Return the configured role-analysis batch size.""" return self._mtg_openai_get_positive_integer_setting( "mvd_tcg_mtg_deck_openai.role_batch_size", self._MTG_OPENAI_ROLE_BATCH_SIZE, ) def _mtg_openai_get_fill_candidate_limit(self): """Return the configured fill candidate pool size.""" return self._mtg_openai_get_positive_integer_setting( "mvd_tcg_mtg_deck_openai.fill_candidate_limit", self._MTG_OPENAI_FILL_CANDIDATE_LIMIT, ) def _mtg_openai_get_fill_batch_size(self): """Return the configured fill selection batch size.""" return self._mtg_openai_get_positive_integer_setting( "mvd_tcg_mtg_deck_openai.fill_batch_size", self._MTG_OPENAI_FILL_BATCH_SIZE, ) def _mtg_openai_get_alternative_candidate_limit(self): """Return the configured alternative suggestion pool size.""" return self._mtg_openai_get_positive_integer_setting( "mvd_tcg_mtg_deck_openai.alternative_candidate_limit", self._MTG_OPENAI_ALTERNATIVE_CANDIDATE_LIMIT, ) def _mtg_openai_get_analysis_lang(self): """Return the active analysis language code. Returns: str: Odoo language code from context or user preferences. """ return ( self.env.context.get("lang") or self.env.user.lang or "en_US" ) def _mtg_openai_get_analysis_language_name(self, lang_code): """Resolve a human-readable language name for one Odoo language code. Args: lang_code: Odoo language code such as ``de_DE``. Returns: str: Display name for prompting, for example ``German``. """ language = self.env["res.lang"].sudo().search([("code", "=", lang_code)], limit=1) return language.name or lang_code or "English" @classmethod def _mtg_openai_join_batches(cls, items): """Join batched prompts or responses with one stable separator. Args: items: Ordered prompt or response chunks. Returns: str | bool: Joined payload text, or ``False`` when empty. """ cleaned_items = [item for item in items if item] if not cleaned_items: return False return cls._MTG_OPENAI_BATCH_SEPARATOR.join(cleaned_items) def _mtg_openai_write_run_state( self, run_key, *, state, model_name, analysis_lang, prompt=False, response=False, error_message=False, extra_values=None, ): """Persist one OpenAI run state update on the current deck. Args: run_key: Stable OpenAI run key such as ``analysis`` or ``fill``. state: Target state value. model_name: OpenAI model identifier used for the run. analysis_lang: Active Odoo analysis language code. prompt: Serialized prompt text or batch text. response: Raw response text or joined batch responses. error_message: Optional failure message. extra_values: Optional additional field values to persist. """ self.ensure_one() field_map = self._MTG_OPENAI_RUN_FIELD_MAP[run_key] values = { field_map["state"]: state, field_map["error"]: error_message or False, field_map["analyzed_at"]: fields.Datetime.now(), field_map["model"]: model_name, field_map["lang"]: analysis_lang, field_map["prompt"]: prompt or False, field_map["response"]: response or False, } if extra_values: values.update(extra_values) self.write(values) @staticmethod def _mtg_openai_build_faces_payload(card): """Serialize all explicit MTG card faces for AI prompts. Args: card: MTG card record in a stable language context. Returns: list[dict[str, object]]: Ordered face payloads. """ return [ { "name": section["name"], "mana_cost": section["mana_cost"], "type_line": section["type_line"], "oracle_text": section["oracle_text"], } for section in card.mtg_get_rules_sections() ] def _mtg_openai_build_rules_payload(self, card): """Return one reusable rules payload for prompts. Args: card: MTG card record, in any language context. Returns: dict[str, object]: Face-aware rules payload built from English text. """ self.ensure_one() english_card = card.with_context(lang="en_US") return { "oracle_text": english_card.mtg_get_rules_summary(), "face_count": english_card.mtg_face_count, "faces": self._mtg_openai_build_faces_payload(english_card), } def _mtg_openai_build_card_reference_payload(self, card): """Return one reusable English card payload for AI prompts.""" self.ensure_one() english_card = card.with_context(lang="en_US") return { "name": english_card.display_name, "mana_cost": english_card.mtg_mana_cost or False, "mana_value": english_card.mtg_mana_value or 0.0, "type_line": english_card.mtg_type_line or False, "set": english_card.mtg_set_id.display_name if english_card.mtg_set_id else False, "collector_number": english_card.mtg_collector_number or False, **self._mtg_openai_build_rules_payload(english_card), } def _mtg_openai_build_prompt(self, analysis_lang): """Build the full analysis prompt for one MTG deck. Args: analysis_lang: Target output language code. Returns: str: Final prompt sent to the OpenAI Responses API. """ self.ensure_one() analysis_language_name = self._mtg_openai_get_analysis_language_name( analysis_lang ) deck_payload = { **self._mtg_openai_get_deck_identity_payload(), "metrics": self._mtg_openai_get_metric_payload(), "boards": self._mtg_openai_collect_board_payload(), } return ( "Analyze this Magic: The Gathering deck as an experienced Commander deck builder. " "Explain how the deck functions from a pilot perspective. Focus on the actual card pool, " "the commander, the mana curve, synergy packages, likely play patterns, and how the deck " "wins or stabilizes. The first paragraph should work as a concise short description of the deck. " "Do not invent cards or text. If something is uncertain, say so briefly. " f"Write every value in the JSON response in {analysis_language_name}. " "The provided oracle_text values are intentionally in English and must be treated as the authoritative rules text input. " "Multi-face cards may also include a faces array with the authoritative English rules text for each printed face. " "Treat those cards as one deck slot that can offer multiple spell options.\n\n" "Return JSON with exactly this shape:\n" "{\n" ' "commander_summary": "2-3 short paragraphs in plain text that explain how the deck works. The first paragraph must read like a compact deck blurb.",\n' ' "gameplan_bullets": ["3-6 concise bullets"],\n' ' "pilot_tips": ["3-6 concise bullets"],\n' ' "risk_notes": ["2-5 concise bullets"]\n' "}\n\n" "Deck data:\n" f"{json.dumps(deck_payload, ensure_ascii=False, indent=2)}" ) def _mtg_openai_build_role_prompt(self, analysis_lang, role_map, batch_payload): """Build one prompt that classifies role tags for deck lines. Args: analysis_lang: Target output language code. role_map: Mapping from stable role keys to deck-role records. batch_payload: Deck-line batch to classify. Returns: str: Final prompt for one role-analysis batch. """ analysis_language_name = self._mtg_openai_get_analysis_language_name( analysis_lang ) role_payload = [ { "key": role_key, "name": role_record.display_name, "note": role_record.note or False, } for role_key, role_record in role_map.items() ] return ( "Analyze these Magic: The Gathering Commander deck cards and assign deckbuilding roles. " "Use only the allowed role keys. A card may have zero, one, or multiple roles. " "Prefer precise role tags over broad tagging. Do not invent cards or rules text. " f"Write every rationale in {analysis_language_name}. " "The provided oracle_text values are intentionally in English and must be treated as the authoritative rules text input. " "Multi-face cards may also include a faces array with the authoritative English rules text for each printed face. " "Treat those cards as one deck slot that can offer multiple spell options.\n\n" "Return JSON with exactly this shape:\n" "{\n" ' "cards": [\n' " {\n" ' "line_id": 123,\n' ' "role_keys": ["ramp", "value"],\n' ' "rationale": "One short sentence."\n' " }\n" " ]\n" "}\n\n" "Every input line_id must appear exactly once in the cards array.\n\n" "Allowed roles:\n" f"{json.dumps(role_payload, ensure_ascii=False, indent=2)}\n\n" "Cards to classify:\n" f"{json.dumps(batch_payload, ensure_ascii=False, indent=2)}" ) def _mtg_openai_build_alternative_prompt(self, analysis_lang, issue_payload): """Build the prompt that suggests replacements for problematic lines.""" self.ensure_one() analysis_language_name = self._mtg_openai_get_analysis_language_name( analysis_lang ) deck_payload = { **self._mtg_openai_get_deck_identity_payload(), "issues": issue_payload, } return ( "You are helping to repair a Magic: The Gathering Commander deck. " "For each problematic deck line, suggest 2 to 3 replacement cards from the provided in-system candidate pool. " "Keep the deck inside the active commander color identity and avoid singleton conflicts. " "Prefer candidates that preserve the card's likely job in the deck, such as ramp, draw, removal, or threat density. " "Only use candidate card_ids listed under each issue line. " f"Write every explanation in {analysis_language_name}. " "The provided oracle_text values are intentionally in English and must be treated as the authoritative rules text input. " "Multi-face cards may include a faces array and should still be treated as one card slot.\n\n" "Return JSON with exactly this shape:\n" "{\n" ' "alternatives": [\n' " {\n" ' "line_id": 123,\n' ' "summary": "One short sentence.",\n' ' "suggestions": [\n' " {\n" ' "card_id": 456,\n' ' "reason": "One short sentence."\n' " }\n" " ]\n" " }\n" " ]\n" "}\n\n" "Every input line_id must appear exactly once. If no useful replacement exists for a line, return an empty suggestions list for that line.\n\n" "Deck repair payload:\n" f"{json.dumps(deck_payload, ensure_ascii=False, indent=2)}" ) def _mtg_openai_build_fill_prompt(self, analysis_lang, candidate_payload, batch_size): """Build the prompt that fills missing mainboard slots.""" self.ensure_one() analysis_language_name = self._mtg_openai_get_analysis_language_name( analysis_lang ) deck_payload = { **self._mtg_openai_get_deck_identity_payload(), "current_metrics": { **self._mtg_openai_get_metric_payload(), "tagged_cards": self.mtg_tagged_line_count, "untagged_cards": self.mtg_untagged_line_count, "role_coverage_ratio": self.mtg_role_coverage_ratio, "remaining_mainboard_slots": self.mtg_expected_mainboard_size - self.mtg_mainboard_count, }, "candidates": candidate_payload, } return ( "You are finishing a Magic: The Gathering Commander deck from an in-system card pool. " "Select the strongest additions for the next batch of missing mainboard slots. " "Balance mana development, card flow, interaction, threats, and commander synergy. " "Use only candidate card_ids from the provided pool. " "Do not invent cards. " "Use quantity values greater than 1 only when allows_multiple_copies is true. " f"Write every explanation in {analysis_language_name}. " "The provided oracle_text values are intentionally in English and must be treated as the authoritative rules text input. " "Multi-face cards may include a faces array and should still be treated as one card slot.\n\n" "Return JSON with exactly this shape:\n" "{\n" ' "summary": "One short paragraph.",\n' ' "cards": [\n' " {\n" ' "card_id": 456,\n' ' "quantity": 1,\n' ' "reason": "One short sentence."\n' " }\n" " ]\n" "}\n\n" f"Select at most {batch_size} total cards in this batch.\n\n" "Deck fill payload:\n" f"{json.dumps(deck_payload, ensure_ascii=False, indent=2)}" ) def _mtg_openai_get_deck_identity_payload(self): """Return the stable deck identity payload reused across prompts. Returns: dict[str, object]: Deck name, format, commander, and color identity. """ self.ensure_one() return { "deck_name": self.name, "format": self.mtg_format_id.display_name or False, "commander": self.mtg_commander_card_id.display_name or False, "color_identity": { "name": self.mtg_color_identity_name or False, "signature": self.mtg_color_identity_signature or False, }, } def _mtg_openai_get_metric_payload(self): """Return the stable MTG metric payload reused across prompts. Returns: dict[str, object]: Current deck metric snapshot. """ self.ensure_one() return { "mainboard_count": self.mtg_mainboard_count, "command_zone_count": self.mtg_command_zone_count, "sideboard_count": self.mtg_sideboard_count, "maybeboard_count": self.mtg_maybeboard_count, "average_mana_value": self.mtg_average_mana_value, "land_count": self.mtg_land_count, "creature_count": self.mtg_creature_count, "artifact_count": self.mtg_artifact_count, "enchantment_count": self.mtg_enchantment_count, "planeswalker_count": self.mtg_planeswalker_count, "instant_count": self.mtg_instant_count, "sorcery_count": self.mtg_sorcery_count, } def _mtg_openai_collect_board_payload(self): """Collect a structured board snapshot for the OpenAI prompt. Oracle text is read explicitly in ``en_US`` to keep the rules input stable even when the active user works in another language. Returns: list[dict[str, object]]: Board and card payloads for the prompt. """ self.ensure_one() board_payload = [] for board in self.board_ids.sorted( lambda current_board: (current_board.sequence, current_board.id) ): if not board.include_in_total and board.code != "command_zone": continue card_payloads = [] for line in board.line_ids.sorted( lambda current_line: ( current_line.board_sequence, getattr(current_line, "mtg_mana_value", 0.0), current_line.card_id.display_name or "", current_line.id, ) ): card_payload = self._mtg_openai_build_card_reference_payload(line.card_id) if ( card_payload["face_count"] <= 1 and card_payload["type_line"] and "Land" in card_payload["type_line"] ): card_payload["oracle_text"] = False card_payloads.append( { "quantity": line.quantity, "roles": line.role_ids.sorted( key=lambda role: (role.sequence, role.name or "", role.id) ).mapped("name"), **card_payload, } ) board_payload.append( { "name": board.name, "code": board.code, "include_in_total": board.include_in_total, "cards": card_payloads, } ) return board_payload def _mtg_openai_collect_role_payload(self): """Collect deck-line payloads for role classification. Returns: list[dict[str, object]]: Structured deck-line payloads. """ self.ensure_one() payload = [] for line in self.line_ids.sorted( lambda current_line: ( current_line.board_sequence, getattr(current_line, "mtg_mana_value", 0.0), current_line.card_id.display_name or "", current_line.id, ) ): payload.append( { "line_id": line.id, "board": line.board_id.code or line.board_id.name, "quantity": line.quantity, **self._mtg_openai_build_card_reference_payload(line.card_id), } ) return payload def _mtg_openai_collect_alternative_payload(self): """Collect issue lines plus per-line replacement candidates.""" self.ensure_one() issue_payload = [] issue_lines = self.line_ids.filtered( lambda line: line.board_id.include_in_total and line.mtg_issue_count ).sorted( lambda current_line: ( current_line.board_sequence, current_line.primary_role_sequence, current_line.mtg_mana_value, current_line.card_id.display_name or "", current_line.id, ) ) for line in issue_lines: candidates = self._mtg_openai_get_alternative_candidate_cards( issue_line=line, limit=self._mtg_openai_get_alternative_candidate_limit(), ) if not candidates: continue issue_payload.append( { "line_id": line.id, "card": self._mtg_openai_build_line_payload(line), "issues": self._mtg_openai_get_issue_labels(line), "candidates": [ self._mtg_openai_build_card_payload(card) for card in candidates ], } ) return issue_payload def _mtg_openai_collect_fill_candidate_payload(self, limit): """Collect a balanced candidate pool for AI-driven deck fill.""" self.ensure_one() candidates = self._mtg_openai_get_fill_candidate_cards(limit=limit) return [self._mtg_openai_build_card_payload(card) for card in candidates] def _mtg_openai_get_issue_labels(self, line): """Return stable issue labels for one MTG deck line.""" issue_labels = [] if line.mtg_singleton_violation: issue_labels.append("singleton") if line.mtg_color_identity_violation: issue_labels.append("color_identity") if not line.mtg_legality_ok: issue_labels.append(line.mtg_legality_status or "illegal") return issue_labels or ["issue"] def _mtg_openai_build_line_payload(self, line): """Serialize one deck line for OpenAI prompts.""" return { "line_id": line.id, "board": line.board_id.code or line.board_id.name, "quantity": line.quantity, "roles": line.role_ids.sorted( key=lambda role: (role.sequence, role.name or "", role.id) ).mapped("name"), **self._mtg_openai_build_card_reference_payload(line.card_id), } def _mtg_openai_build_card_payload(self, card): """Serialize one MTG card candidate for OpenAI prompts.""" english_card = card.with_context(lang="en_US") primary_types = english_card.mtg_card_type_ids.sorted( key=lambda card_type: (card_type.sequence, card_type.name or "", card_type.id) ).mapped("name") legality_status = self.env["mvd.tcg.deck.line"]._mtg_get_card_legality_status( english_card, self.mtg_format_id.code or "", ) return { "card_id": english_card.id, "color_identity": { "signature": english_card.mtg_color_identity_signature or False, "colors": english_card.mtg_color_identity_ids.sorted( key=lambda color: (color.sequence, color.code or "", color.id) ).mapped("name"), }, **self._mtg_openai_build_card_reference_payload(english_card), "rarity": english_card.mtg_rarity_id.display_name if english_card.mtg_rarity_id else False, "types": primary_types, "allows_multiple_copies": english_card.mtg_allows_unlimited_copies(), "current_quantity": self._mtg_openai_get_current_mainboard_quantity(english_card), "legality_status": legality_status, } def _mtg_openai_get_current_mainboard_quantity(self, card): """Return the current mainboard quantity for one card record.""" self.ensure_one() mainboard_line = self.mtg_mainboard_line_ids.filtered( lambda line: line.card_id == card )[:1] return mainboard_line.quantity if mainboard_line else 0 def _mtg_openai_get_existing_singleton_aliases(self): """Return singleton aliases currently present in included MTG boards.""" self.ensure_one() aliases = set() for card in self.line_ids.filtered( lambda line: line.board_id.include_in_total and line.card_id.game_id.code == "mtg" ).mapped("card_id").with_context(lang="en_US"): aliases.update(card.mtg_get_singleton_key_aliases()) return aliases def _mtg_openai_identity_allows_card(self, card): """Return whether one card fits the current commander identity.""" self.ensure_one() commander_identity = set(self.mtg_color_identity_signature or "") if not commander_identity: return True return not (set(card.mtg_color_identity_signature or "") - commander_identity) def _mtg_openai_is_legal_candidate(self, card): """Return whether one card can be considered for the active format.""" legality_status = self.env["mvd.tcg.deck.line"]._mtg_get_card_legality_status( card, self.mtg_format_id.code or "", ) return legality_status not in {"banned", "not_legal"} def _mtg_openai_get_base_fill_candidates(self): """Return the coarse MTG candidate pool for deck fill.""" self.ensure_one() card_model = self.env["mvd.tcg.card"].with_context(lang="en_US") existing_aliases = self._mtg_openai_get_existing_singleton_aliases() candidates = card_model.browse() for card in card_model.search( [ ("game_id.code", "=", "mtg"), ("active", "=", True), ("mtg_is_token", "=", False), ("mtg_is_digital", "=", False), ], order="mtg_collector_sort_key, id", ): if not self._mtg_openai_identity_allows_card(card): continue if not self._mtg_openai_is_legal_candidate(card): continue if not card.mtg_allows_unlimited_copies() and ( existing_aliases & set(card.mtg_get_singleton_key_aliases()) ): continue candidates |= card return candidates def _mtg_openai_get_fill_candidate_cards(self, limit): """Return a balanced, size-limited MTG candidate pool for deck fill.""" self.ensure_one() type_priority = ( "land", "creature", "artifact", "enchantment", "instant", "sorcery", "planeswalker", "other", ) grouped_candidates = {type_code: [] for type_code in type_priority} for card in self._mtg_openai_get_base_fill_candidates(): type_codes = set(card.mtg_card_type_ids.mapped("code")) if "land" in type_codes: grouped_candidates["land"].append(card.id) elif "creature" in type_codes: grouped_candidates["creature"].append(card.id) elif "artifact" in type_codes: grouped_candidates["artifact"].append(card.id) elif "enchantment" in type_codes: grouped_candidates["enchantment"].append(card.id) elif "instant" in type_codes: grouped_candidates["instant"].append(card.id) elif "sorcery" in type_codes: grouped_candidates["sorcery"].append(card.id) elif "planeswalker" in type_codes: grouped_candidates["planeswalker"].append(card.id) else: grouped_candidates["other"].append(card.id) ordered_ids = [] indices = {type_code: 0 for type_code in type_priority} while len(ordered_ids) < limit: did_append = False for type_code in type_priority: bucket = grouped_candidates[type_code] bucket_index = indices[type_code] if bucket_index >= len(bucket): continue ordered_ids.append(bucket[bucket_index]) indices[type_code] += 1 did_append = True if len(ordered_ids) >= limit: break if not did_append: break return self.env["mvd.tcg.card"].browse(ordered_ids) def _mtg_openai_get_alternative_candidate_cards(self, issue_line, limit): """Return a narrow replacement pool for one problematic line.""" self.ensure_one() target_card = issue_line.card_id.with_context(lang="en_US") target_types = set(target_card.mtg_card_type_ids.mapped("code")) target_mana_value = target_card.mtg_mana_value or 0.0 existing_aliases = self._mtg_openai_get_existing_singleton_aliases() candidates = self.env["mvd.tcg.card"].browse() fallback_candidates = self.env["mvd.tcg.card"].browse() for card in self._mtg_openai_get_base_fill_candidates(): if card == target_card: continue if existing_aliases & set(card.mtg_get_singleton_key_aliases()): continue card_types = set(card.mtg_card_type_ids.mapped("code")) mana_gap = abs((card.mtg_mana_value or 0.0) - target_mana_value) if target_types and card_types and target_types & card_types: if mana_gap <= 3.0: candidates |= card else: fallback_candidates |= card else: fallback_candidates |= card ordered_candidates = (candidates | fallback_candidates).sorted( key=lambda card: ( 0 if target_types & set(card.mtg_card_type_ids.mapped("code")) else 1, abs((card.mtg_mana_value or 0.0) - target_mana_value), card.mtg_mana_value or 0.0, card.display_name or "", card.id, ) ) return ordered_candidates[:limit] def _mtg_openai_add_card_to_board(self, card, board, quantity=1): """Create or increment one deck line in a validated way.""" self.ensure_one() line_model = self.env["mvd.tcg.deck.line"] existing_line = line_model.search( [ ("board_id", "=", board.id), ("card_id", "=", card.id), ], limit=1, ) self._mvd_tcg_validate_add_to_board( card, board, quantity=quantity, existing_line=existing_line, ) if existing_line: existing_line.write({"quantity": existing_line.quantity + quantity}) return existing_line return line_model.create( { "board_id": board.id, "quantity": quantity, "card_id": card.id, } ) def _mtg_openai_get_role_key_map(self): """Return the supported deck-role records keyed by stable AI keys. Returns: dict[str, Model]: Stable role-key mapping. """ role_xmlids = { "ramp": "mvd_tcg_deck.mvd_tcg_deck_role_ramp", "draw": "mvd_tcg_deck.mvd_tcg_deck_role_draw", "removal": "mvd_tcg_deck.mvd_tcg_deck_role_removal", "interaction": "mvd_tcg_deck.mvd_tcg_deck_role_interaction", "protection": "mvd_tcg_deck.mvd_tcg_deck_role_protection", "wincon": "mvd_tcg_deck.mvd_tcg_deck_role_wincon", "value": "mvd_tcg_deck.mvd_tcg_deck_role_value", "combo": "mvd_tcg_deck.mvd_tcg_deck_role_combo", } role_keys = list(role_xmlids) role_model = self.env["mvd.tcg.deck.role"] roles = role_model.search([("technical_key", "in", role_keys)]) role_map = {role.technical_key: role for role in roles} missing_role_keys = [role_key for role_key in role_keys if role_key not in role_map] if missing_role_keys: for role_key in list(missing_role_keys): fallback_role = self.env.ref(role_xmlids[role_key], raise_if_not_found=False) if not fallback_role: continue role_map[role_key] = fallback_role missing_role_keys = [ role_key for role_key in role_keys if role_key not in role_map ] if missing_role_keys: raise UserError( _( "Missing configured deck roles for: %s" ) % ", ".join(missing_role_keys) ) return role_map def _mtg_openai_prepare_alternative_suggestions(self, issue_payload, alternative_payload): """Validate and normalize alternative suggestions returned by OpenAI.""" expected_ids = {item["line_id"] for item in issue_payload} candidate_ids_by_line = { item["line_id"]: {candidate["card_id"] for candidate in item["candidates"]} for item in issue_payload } issue_map = {item["line_id"]: item for item in issue_payload} prepared = [] seen_ids = set() for item in alternative_payload.get("alternatives") or []: line_id = item.get("line_id") if line_id not in expected_ids: raise UserError( _("OpenAI returned alternatives for an unknown deck line.") ) if line_id in seen_ids: raise UserError( _("OpenAI returned duplicate alternative rows for one deck line.") ) seen_ids.add(line_id) allowed_candidate_ids = candidate_ids_by_line[line_id] suggestions = [] seen_candidate_ids = set() for suggestion in item.get("suggestions") or []: card_id = suggestion.get("card_id") if card_id not in allowed_candidate_ids: raise UserError( _("OpenAI returned an unsupported replacement candidate.") ) if card_id in seen_candidate_ids: continue seen_candidate_ids.add(card_id) candidate = self.env["mvd.tcg.card"].browse(card_id).exists() if not candidate: continue suggestions.append( { "card_id": candidate.id, "name": candidate.display_name, "mana_cost": candidate.mtg_mana_cost or False, "type_line": candidate.mtg_type_line or False, "reason": (suggestion.get("reason") or "").strip() or False, } ) prepared.append( { "line_id": line_id, "card_name": issue_map[line_id]["card"]["name"], "issues": issue_map[line_id]["issues"], "summary": (item.get("summary") or "").strip() or False, "suggestions": suggestions, } ) if seen_ids != expected_ids: raise UserError( _("OpenAI did not return alternative rows for every issue line.") ) return prepared def _mtg_openai_prepare_fill_selection(self, candidate_payload, fill_payload, max_cards): """Validate and normalize one batch of fill-card selections.""" allowed_candidates = { item["card_id"]: item for item in candidate_payload } selected_cards = [] seen_card_ids = set() total_quantity = 0 for item in fill_payload.get("cards") or []: card_id = item.get("card_id") candidate = allowed_candidates.get(card_id) if not candidate: raise UserError(_("OpenAI returned a card outside the allowed pool.")) if card_id in seen_card_ids: raise UserError(_("OpenAI returned the same fill card more than once.")) seen_card_ids.add(card_id) quantity = int(item.get("quantity") or 1) if quantity <= 0: raise UserError(_("OpenAI returned an invalid fill quantity.")) if quantity > 1 and not candidate["allows_multiple_copies"]: raise UserError( _("OpenAI attempted to add multiple copies of a singleton card.") ) total_quantity += quantity if total_quantity > max_cards: raise UserError(_("OpenAI attempted to add too many cards in one batch.")) selected_cards.append( { "card_id": card_id, "quantity": quantity, "name": candidate["name"], "mana_cost": candidate["mana_cost"], "type_line": candidate["type_line"], "reason": (item.get("reason") or "").strip() or False, } ) return { "summary": (fill_payload.get("summary") or "").strip() or False, "cards": selected_cards, } def _mtg_openai_prepare_role_updates(self, batch_payload, role_payload, role_map): """Validate one role-analysis batch and convert it into line writes. Args: batch_payload: Input card payloads for one batch. role_payload: Parsed JSON payload returned by OpenAI. role_map: Supported role records keyed by stable AI keys. Returns: list[dict[str, object]]: Line updates ready for ORM writes. """ expected_ids = {item["line_id"] for item in batch_payload} cards = role_payload.get("cards") or [] seen_ids = set() updates = [] for item in cards: line_id = item.get("line_id") if line_id not in expected_ids: raise UserError( _("OpenAI returned a role analysis for an unknown deck line.") ) if line_id in seen_ids: raise UserError( _("OpenAI returned duplicate role analysis rows for one deck line.") ) role_ids = [] for role_key in item.get("role_keys") or []: role_record = role_map.get(role_key) if not role_record: raise UserError( _( "OpenAI returned an unsupported role key: %(role_key)s", role_key=role_key, ) ) role_ids.append(role_record.id) seen_ids.add(line_id) updates.append( { "line_id": line_id, "role_ids": role_ids, "rationale": (item.get("rationale") or "").strip() or False, } ) if seen_ids != expected_ids: raise UserError( _("OpenAI did not return role assignments for every deck line.") ) return updates def _mtg_openai_request_analysis(self, api_key, model_name, prompt): """Call the OpenAI Responses API and parse the JSON analysis payload. Args: api_key: OpenAI API key. model_name: OpenAI model identifier. prompt: Final analysis prompt. Returns: tuple[dict[str, object], str]: Parsed analysis JSON and raw response. """ schema = { "type": "object", "additionalProperties": False, "properties": { "commander_summary": {"type": "string"}, "gameplan_bullets": { "type": "array", "items": {"type": "string"}, }, "pilot_tips": { "type": "array", "items": {"type": "string"}, }, "risk_notes": { "type": "array", "items": {"type": "string"}, }, }, "required": [ "commander_summary", "gameplan_bullets", "pilot_tips", "risk_notes", ], } return self._mtg_openai_request_structured_payload( api_key=api_key, model_name=model_name, prompt=prompt, system_instruction=( "You are an expert Magic: The Gathering Commander analyst. " "You explain how decks function, how they sequence, and where " "their strengths and risks lie." ), response_name="mvd_tcg_mtg_commander_analysis", schema=schema, max_output_tokens=3200, ) def _mtg_openai_request_role_analysis(self, api_key, model_name, prompt): """Call the OpenAI Responses API for card-role classification. Args: api_key: OpenAI API key. model_name: OpenAI model identifier. prompt: Final role-analysis prompt. Returns: tuple[dict[str, object], str]: Parsed role JSON and raw response. """ schema = { "type": "object", "additionalProperties": False, "properties": { "cards": { "type": "array", "items": { "type": "object", "additionalProperties": False, "properties": { "line_id": {"type": "integer"}, "role_keys": { "type": "array", "items": { "type": "string", "enum": [ "ramp", "draw", "removal", "interaction", "protection", "wincon", "value", "combo", ], }, }, "rationale": {"type": "string"}, }, "required": ["line_id", "role_keys", "rationale"], }, } }, "required": ["cards"], } return self._mtg_openai_request_structured_payload( api_key=api_key, model_name=model_name, prompt=prompt, system_instruction=( "You are an expert Magic: The Gathering Commander deckbuilding analyst. " "You classify cards into practical deckbuilding roles such as ramp, draw, " "interaction, protection, win condition, value, and combo." ), response_name="mvd_tcg_mtg_card_role_analysis", schema=schema, max_output_tokens=6400, ) def _mtg_openai_request_alternative_analysis(self, api_key, model_name, prompt): """Call the Responses API for issue-line replacement suggestions.""" schema = { "type": "object", "additionalProperties": False, "properties": { "alternatives": { "type": "array", "items": { "type": "object", "additionalProperties": False, "properties": { "line_id": {"type": "integer"}, "summary": {"type": "string"}, "suggestions": { "type": "array", "items": { "type": "object", "additionalProperties": False, "properties": { "card_id": {"type": "integer"}, "reason": {"type": "string"}, }, "required": ["card_id", "reason"], }, }, }, "required": ["line_id", "summary", "suggestions"], }, } }, "required": ["alternatives"], } return self._mtg_openai_request_structured_payload( api_key=api_key, model_name=model_name, prompt=prompt, system_instruction=( "You are an expert Magic: The Gathering Commander deck builder. " "You repair illegal or awkward deck lines by proposing replacement cards " "that preserve deck function, mana curve, and commander synergy." ), response_name="mvd_tcg_mtg_alternative_suggestions", schema=schema, max_output_tokens=7200, ) def _mtg_openai_request_fill_selection(self, api_key, model_name, prompt): """Call the Responses API for a batch of deck-fill card selections.""" schema = { "type": "object", "additionalProperties": False, "properties": { "summary": {"type": "string"}, "cards": { "type": "array", "items": { "type": "object", "additionalProperties": False, "properties": { "card_id": {"type": "integer"}, "quantity": {"type": "integer"}, "reason": {"type": "string"}, }, "required": ["card_id", "quantity", "reason"], }, }, }, "required": ["summary", "cards"], } return self._mtg_openai_request_structured_payload( api_key=api_key, model_name=model_name, prompt=prompt, system_instruction=( "You are an expert Magic: The Gathering Commander deck builder. " "You complete partial decks from a bounded in-system card pool while keeping " "mana, interaction, draw, threat density, and commander synergy in balance." ), response_name="mvd_tcg_mtg_fill_selection", schema=schema, max_output_tokens=9600, ) def _mtg_openai_request_structured_payload( self, api_key, model_name, prompt, system_instruction, response_name, schema, max_output_tokens, ): """Call the Responses API for one structured JSON response. Args: api_key: OpenAI API key. model_name: OpenAI model identifier. prompt: Final user prompt. system_instruction: System message content. response_name: Stable JSON schema name. schema: JSON schema definition. max_output_tokens: Output token limit for the response. Returns: tuple[dict[str, object], str]: Parsed JSON payload and raw response. """ request_payload = { "model": model_name, "reasoning": {"effort": "low"}, "input": [ { "role": "system", "content": [ { "type": "input_text", "text": system_instruction, } ], }, { "role": "user", "content": [{"type": "input_text", "text": prompt}], }, ], "text": { "verbosity": "low", "format": { "type": "json_schema", "name": response_name, "schema": schema, "strict": True, } }, "max_output_tokens": max_output_tokens, } http_request = request.Request( f"{self._mtg_openai_get_api_base_url()}/responses", data=json.dumps(request_payload).encode("utf-8"), headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, method="POST", ) response_body = False for attempt in range(1, 4): try: with request.urlopen( http_request, timeout=self._mtg_openai_get_request_timeout_seconds(), ) as response: response_body = response.read().decode("utf-8") break except error.HTTPError as exc: error_body = exc.read().decode("utf-8", errors="replace") if exc.code in {429, 500, 502, 503, 504} and attempt < 3: time.sleep(attempt * 2) continue raise UserError( f"OpenAI request failed with status {exc.code}: {error_body}" ) from exc except error.URLError as exc: if attempt < 3: time.sleep(attempt * 2) continue raise UserError(f"OpenAI request failed: {exc.reason}") from exc response_payload = json.loads(response_body) if response_payload.get("status") == "incomplete": incomplete_reason = ( response_payload.get("incomplete_details", {}).get("reason") or "unknown" ) if incomplete_reason == "max_output_tokens": raise UserError( "OpenAI deck analysis hit the output token limit before it returned the final JSON." ) raise UserError( f"OpenAI deck analysis did not complete successfully: {incomplete_reason}." ) text_payload = cls._mtg_openai_extract_output_text(response_payload) try: analysis_payload = json.loads(text_payload) except json.JSONDecodeError as exc: raise UserError("OpenAI returned a non-JSON analysis payload.") from exc return analysis_payload, response_body @staticmethod def _mtg_openai_extract_output_text(response_payload): """Extract the text output from one Responses API payload. Args: response_payload: Decoded JSON payload returned by Responses. Returns: str: Concatenated text output. """ output_text = response_payload.get("output_text") if output_text: return output_text fragments = [] for output_item in response_payload.get("output", []): if output_item.get("type") != "message": continue for content_item in output_item.get("content", []): text_value = content_item.get("text") if text_value: fragments.append(text_value) if fragments: return "\n".join(fragments) raise UserError("OpenAI returned no text output for the deck analysis.") @staticmethod def _mtg_openai_render_paragraphs(text_value): """Render plain text paragraphs into safe Odoo HTML. Args: text_value: Plain text paragraph content. Returns: str | bool: Safe HTML markup or ``False`` if empty. """ clean_value = (text_value or "").strip() if not clean_value: return False paragraphs = [ f"
{html.escape(paragraph.strip())}
" for paragraph in clean_value.split("\n") if paragraph.strip() ] return "".join(paragraphs) or False @staticmethod def _mtg_openai_render_bullets(items): """Render a list of plain-text bullets into safe Odoo HTML. Args: items: Bullet strings. Returns: str | bool: Safe HTML markup or ``False`` if empty. """ cleaned_items = [ html.escape((item or "").strip()) for item in (items or []) if (item or "").strip() ] if not cleaned_items: return False bullet_markup = "".join(f"{html.escape(item['summary'])}
" if item["summary"] else "" ) if item["suggestions"]: suggestion_markup = "" for suggestion in item["suggestions"]: mana_markup = ( f" {html.escape(suggestion['mana_cost'])}" if suggestion["mana_cost"] else "" ) type_markup = ( f" {html.escape(suggestion['type_line'])}" if suggestion["type_line"] else "" ) reason_markup = ( f" - {html.escape(suggestion['reason'])}" if suggestion["reason"] else "" ) suggestion_markup += ( "Issue: {issue_markup}
" f"{summary_markup}" f"Added {total_added} card(s). " f"Remaining mainboard slots: {remaining_slots}.
" ) batch_markup = [] for index, batch in enumerate(fill_batches, start=1): summary_markup = ( f"{html.escape(batch['summary'])}
" if batch["summary"] else "" ) card_markup = "" for suggestion in batch["cards"]: mana_markup = ( f" {html.escape(suggestion['mana_cost'])}" if suggestion["mana_cost"] else "" ) type_markup = ( f" {html.escape(suggestion['type_line'])}" if suggestion["type_line"] else "" ) reason_markup = ( f" - {html.escape(suggestion['reason'])}" if suggestion["reason"] else "" ) card_markup += ( "