From 525877a27f5616a1fcb341a6a8923331f0f93efd Mon Sep 17 00:00:00 2001 From: Quentin Mondot Date: Thu, 16 Oct 2025 12:37:34 +0200 Subject: [PATCH] [IMP] survey_record_generation: first refacto of record creation --- .../models/survey_question.py | 6 +- .../models/survey_user_input.py | 322 +++++++++++++----- .../readme/CONTRIBUTORS.rst | 3 +- 3 files changed, 245 insertions(+), 86 deletions(-) diff --git a/survey_record_generation/models/survey_question.py b/survey_record_generation/models/survey_question.py index 6767727..4bb0478 100644 --- a/survey_record_generation/models/survey_question.py +++ b/survey_record_generation/models/survey_question.py @@ -1,12 +1,14 @@ import logging import ast +from typing import Literal from odoo import api, fields, models, _, Command from odoo.exceptions import UserError _logger = logging.getLogger(__name__) +AnswerValuesType: Literal["no", "value", "record"] class SurveyQuestion(models.Model): _inherit = 'survey.question' @@ -14,8 +16,8 @@ class SurveyQuestion(models.Model): model_id = fields.Many2one('ir.model', string="Model") model_name = fields.Char(related="model_id.model") fill_domain = fields.Char("Domain", default="[]") - answer_values_type = fields.Selection([('no', 'No values'),('value','Value'),('record','Record')], string="Associate value to answer", default="no", required=True) - + answer_values_type = fields.Selection([('no', 'No values'),('value','Value'),('record','Record')], string="Associate value to answer", default="no", required=True) + @api.onchange('model_id') def onchange_model_id(self): self.fill_domain = "[]" diff --git a/survey_record_generation/models/survey_user_input.py b/survey_record_generation/models/survey_user_input.py index bf376f2..6bca5e8 100644 --- a/survey_record_generation/models/survey_user_input.py +++ b/survey_record_generation/models/survey_user_input.py @@ -1,13 +1,24 @@ # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). -from odoo import models, fields, _ +from typing import TYPE_CHECKING, Any, Literal + +from odoo import _, fields, models from odoo.exceptions import UserError +if TYPE_CHECKING: + from .survey_question import AnswerValuesType + from .survey_record_creation import SurveyRecordCreation + from .survey_record_creation_field_values import SurveyRecordCreationFieldValues + class SurveyUserInput(models.Model): _inherit = "survey.user_input" - generated_record_ids = fields.One2many('survey.generated.record', 'user_input_id', 'Generated records') - generated_records_count = fields.Integer("Attempts Count", compute='_compute_generated_records_count') + generated_record_ids = fields.One2many( + "survey.generated.record", "user_input_id", "Generated records" + ) + generated_records_count = fields.Integer( + "Attempts Count", compute="_compute_generated_records_count" + ) def _compute_generated_records_count(self): for user_input in self: @@ -16,112 +27,257 @@ class SurveyUserInput(models.Model): def action_redirect_to_generated_records(self): self.ensure_one() - action = self.env['ir.actions.act_window']._for_xml_id('survey_record_generation.survey_generated_record_action') - """ context = dict(self.env.context or {}) - - context['create'] = False - context['search_default_survey_id'] = self.survey_id.id - context['search_default_group_by_survey'] = False - if self.partner_id: - context['search_default_partner_id'] = self.partner_id.id - elif self.email: - context['search_default_email'] = self.email - - action['context'] = context """ + action = self.env["ir.actions.act_window"]._for_xml_id( + "survey_record_generation.survey_generated_record_action" + ) return action - def _mark_done(self, ignore_when_res_partner_mandatory_fields_are_missing = False): + def _mark_done( + self, ignore_when_res_partner_mandatory_fields_are_missing: bool = False + ): # generate records for user_input in self: created_records = {} fields_to_update = [] - for record_creation in user_input.survey_id.survey_record_creation_ids.sorted('sequence'): - model = record_creation.model_id.model - vals = {} - ModelClass = self.env[model] + record_creation: SurveyRecordCreation + for ( + record_creation + ) in user_input.survey_id.survey_record_creation_ids.sorted("sequence"): + model: str = record_creation.model_id.model + vals: dict = {} + field_value: SurveyRecordCreationFieldValues for field_value in record_creation.field_values_ids: - if field_value.value_origin == 'fixed': - vals[field_value.field_id.name] = field_value.get_fixed_value_for_record_creation() - elif field_value.value_origin == 'question': - # find user_input_lines of the question - user_input_lines = [user_input_line for user_input_line in user_input.user_input_line_ids if user_input_line.question_id == field_value.question_id] + field_name: str = field_value.field_id.name + value: Any = None - if not user_input_lines: - continue - - if field_value.question_id.question_type in ['simple_choice', 'multiple_choice','matrix']: - if field_value.question_id.answer_values_type == 'record': - record_ids = [] - for user_input_line in user_input_lines: - if user_input_line.suggested_answer_id and user_input_line.suggested_answer_id.record_id: - record_ids.append(user_input_line.suggested_answer_id.record_id.id) - if field_value.question_id.question_type == 'simple_choice': - if record_ids: - vals[field_value.field_id.name] = record_ids[0] - else: - vals[field_value.field_id.name] = None - else: - vals[field_value.field_id.name] = record_ids - if field_value.question_id.answer_values_type == 'value': - if field_value.field_id.ttype == "boolean": - boolean_value = user_input_lines[0].suggested_answer_id.value_char in [True, 1, "1", "True", "true", "Oui", "oui", "Yes", "yes"] - vals[field_value.field_id.name] = boolean_value - else: - vals[field_value.field_id.name] = user_input_lines[0].suggested_answer_id.value_char - elif user_input_lines[0].answer_type: # if value not filled by user, answer_type not set - vals[field_value.field_id.name] = user_input_lines[0][f"value_{user_input_lines[0].answer_type}"] - else: - vals[field_value.field_id.name] = None - elif field_value.value_origin == 'other_record': + if field_value.value_origin == "fixed": + value = field_value.get_fixed_value_for_record_creation() + elif field_value.value_origin == "question": + value = self.get_value_from_user_answer(field_value, user_input) + elif field_value.value_origin == "other_record": fields_to_update.append(field_value) - # check if the field to update is mandatory - if ModelClass._fields[field_value.field_id.name].required: - # check if the other record is already created, if yes add it to vals - if len(created_records) > 0 and created_records[field_value.other_created_record_id.id]: - linked_record = created_records[field_value.other_created_record_id.id] - vals[field_value.field_id.name] = linked_record.id - else: - raise UserError( - _("The field %s is mandatory. In Record Creation tab, drag %s at the top of the table") - % (field_value.field_id.display_name, field_value.other_created_record_id.name) - ) - # check duplicates - uniq_fields = [field_value.field_id.name for field_value in record_creation.field_values_ids.filtered(lambda r:r.unicity_check)] - duplicate = None - if uniq_fields: - uniq_domain = [] - for uniq_field in uniq_fields: - uniq_domain.append((uniq_field,'=',vals[uniq_field])) - duplicate = self.env[model].search(uniq_domain, limit=1) + value = self.get_value_from_other_record( + model, created_records, field_value + ) + + vals[field_name] = value + + duplicate = self.find_duplicate_if_there_are_fields_with_unicity_check( + model, record_creation, vals + ) if duplicate: record = duplicate else: - if model == "res.partner" and ignore_when_res_partner_mandatory_fields_are_missing: - # this part has been developed for Calim specific needs : being able to create several Contacts with the same survey - # TODO : find a way to make it generic for all models ? + if ( + model == "res.partner" + and ignore_when_res_partner_mandatory_fields_are_missing + ): + # this part has been developed for Calim specific needs : + # being able to ignore some Contacts creation + # TODO : find a way to make it generic for all models if not vals.get("lastname") and not vals.get("firstname"): continue # Create record record = self.env[model].create(vals) # Link generated records to user input - self.env['survey.generated.record'].create({ - 'survey_record_creation_name':record_creation.name, - 'survey_record_creation_id':record_creation.id, - 'user_input_id':user_input.id, - "created_record_id":"%s,%s" % (model,record.id) - }) + self.env["survey.generated.record"].create( + { + "survey_record_creation_name": record_creation.name, + "survey_record_creation_id": record_creation.id, + "user_input_id": user_input.id, + "created_record_id": f"{model},{record.id}", + } + ) created_records[record_creation.id] = record # update linked records + # TODO : not covered by test, because I don't get the purpose + # TODO : maybe in case of duplicate ? for field_to_update in fields_to_update: - record_to_update = created_records.get(field_to_update.survey_record_creation_id.id) + record_to_update = created_records.get( + field_to_update.survey_record_creation_id.id + ) if record_to_update: - linked_record = created_records[field_to_update.other_created_record_id.id] - record_to_update.write({field_to_update.field_id.name:linked_record.id}) + linked_record = created_records[ + field_to_update.other_created_record_id.id + ] + record_to_update.write( + {field_to_update.field_id.name: linked_record.id} + ) return super()._mark_done() + + def find_duplicate_if_there_are_fields_with_unicity_check( + self, model: str, record_creation: "SurveyRecordCreation", vals: dict[Any, Any] + ) -> Any: + # check duplicates + unique_fields = [ + field_value.field_id.name + for field_value in record_creation.field_values_ids.filtered( + lambda r: r.unicity_check + ) + ] + duplicate = None + if unique_fields: + uniq_domain = [] + for uniq_field in unique_fields: + uniq_domain.append((uniq_field, "=", vals[uniq_field])) + duplicate = self.env[model].search(uniq_domain, limit=1) + return duplicate + + def get_value_from_other_record( + self, + model: str, + created_records: dict[Any, Any], + field_value: "SurveyRecordCreationFieldValues", + ) -> Any: + model_class = self.env[model] + # check if the field to update is mandatory + if model_class._fields[field_value.field_id.name].required: + # check if the other record is already created, if yes add it to vals + if ( + len(created_records) > 0 + and created_records[field_value.other_created_record_id.id] + ): + linked_record = created_records[field_value.other_created_record_id.id] + return linked_record.id + else: + raise UserError( + _( + "The field %(field)s is mandatory. In Record Creation tab, " + "drag %(record)s at the top of the table" + ) + % { + "field": field_value.field_id.display_name, + "record": field_value.other_created_record_id.name, + } + ) + else: + raise UserError( + _( + "[Survey record generation] Only required fields can be " + "set up with 'other_record' " + "(field %(field)s is not mandatory for model %(model)s)." + ) + % {"field": field_value.field_id.name, "model": model} + ) + + def get_value_from_user_answer( + self, + field_value: "SurveyRecordCreationFieldValues", + user_input: "SurveyUserInput", + ) -> Any: + # find user_input_lines (which are user's answers) for the question + user_input_lines = [ + user_input_line + for user_input_line in user_input.user_input_line_ids + if user_input_line.question_id == field_value.question_id + ] + + if not user_input_lines: + # If the question has not been displayed to the user, + # there are no user_input_lines + return None + if user_input_lines[0].skipped: + # The question has been ignored by the user + return None + + question_type = field_value.question_id.question_type + + if question_type in [ + "char_box", + "text_box", + "numerical_box", + "date", + "datetime", + ]: + return user_input_lines[0][f"value_{user_input_lines[0].answer_type}"] + elif question_type in ["simple_choice", "multiple_choice", "matrix"]: + answer_values_type = field_value.question_id.answer_values_type + return self.get_value_based_on_answer_values_type( + answer_values_type, field_value, question_type, user_input_lines + ) + else: + raise UserError( + _( + "[Survey record generation] The question type %(type)s is not " + "recognized (for question %(question)s)." + ) + % {"type": question_type, "question": field_value.question_id.title} + ) + + def get_value_based_on_answer_values_type( + self, + answer_values_type: "AnswerValuesType", + field_value: "SurveyRecordCreationFieldValues", + question_type: Literal["simple_choice", "multiple_choice", "matrix"], + user_input_lines: list[Any], + ) -> Any: + if answer_values_type == "record": + answered_record_ids = [] + for user_input_line in user_input_lines: + if ( + user_input_line.suggested_answer_id + and user_input_line.suggested_answer_id.record_id + ): + answered_record_ids.append( + user_input_line.suggested_answer_id.record_id.id + ) + if not answered_record_ids: + return None + if question_type == "simple_choice": + return answered_record_ids[0] + elif question_type == "multiple_choice": + return answered_record_ids + else: + raise UserError( + _( + "[Survey record generation] The question type" + " %(type)s is not supported yet." + ) + % {"type": question_type} + ) + elif answer_values_type == "value": + if field_value.field_id.ttype != "boolean": + return user_input_lines[0].suggested_answer_id.value_char + else: + answer_value_char = user_input_lines[0].suggested_answer_id.value_char + if boolean_value := answer_value_char in [ + True, + 1, + "1", + "True", + "true", + "Oui", + "oui", + "Yes", + "yes", + ]: + return boolean_value + else: + raise UserError( + _( + "[Survey record generation] The boolean value %s(value)s " + "is not supported (for question %(question)s)." + ) + % { + "value": answer_value_char, + "question": field_value.question_id.title, + } + ) + else: + raise UserError( + _( + "[Survey record generation] The answer values type %(type)s " + "is not supported (for question %(question)s)." + ) + % { + "type": answer_values_type, + "question": field_value.question_id.title, + } + ) diff --git a/survey_record_generation/readme/CONTRIBUTORS.rst b/survey_record_generation/readme/CONTRIBUTORS.rst index ed90ebe..6aaf080 100644 --- a/survey_record_generation/readme/CONTRIBUTORS.rst +++ b/survey_record_generation/readme/CONTRIBUTORS.rst @@ -1,3 +1,4 @@ * `Elabore `_ - * Clément Thomas \ No newline at end of file + * Clément Thomas + * Quentin Mondot