From c98823f3a330034ea231d7aa4deb02e9029abf98 Mon Sep 17 00:00:00 2001 From: Quentin Mondot Date: Thu, 16 Oct 2025 12:37:34 +0200 Subject: [PATCH] [IMP] survey_record_generation: first refacto of record creation --- .../models/survey_question.py | 6 +- .../models/survey_user_input.py | 171 +++++++++++------- .../readme/CONTRIBUTORS.rst | 3 +- 3 files changed, 115 insertions(+), 65 deletions(-) diff --git a/survey_record_generation/models/survey_question.py b/survey_record_generation/models/survey_question.py index 6767727..4bb0478 100644 --- a/survey_record_generation/models/survey_question.py +++ b/survey_record_generation/models/survey_question.py @@ -1,12 +1,14 @@ import logging import ast +from typing import Literal from odoo import api, fields, models, _, Command from odoo.exceptions import UserError _logger = logging.getLogger(__name__) +AnswerValuesType: Literal["no", "value", "record"] class SurveyQuestion(models.Model): _inherit = 'survey.question' @@ -14,8 +16,8 @@ class SurveyQuestion(models.Model): model_id = fields.Many2one('ir.model', string="Model") model_name = fields.Char(related="model_id.model") fill_domain = fields.Char("Domain", default="[]") - answer_values_type = fields.Selection([('no', 'No values'),('value','Value'),('record','Record')], string="Associate value to answer", default="no", required=True) - + answer_values_type = fields.Selection([('no', 'No values'),('value','Value'),('record','Record')], string="Associate value to answer", default="no", required=True) + @api.onchange('model_id') def onchange_model_id(self): self.fill_domain = "[]" diff --git a/survey_record_generation/models/survey_user_input.py b/survey_record_generation/models/survey_user_input.py index bf376f2..62dbadb 100644 --- a/survey_record_generation/models/survey_user_input.py +++ b/survey_record_generation/models/survey_user_input.py @@ -1,8 +1,9 @@ # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). +from typing import Any, Literal + from odoo import models, fields, _ from odoo.exceptions import UserError - class SurveyUserInput(models.Model): _inherit = "survey.user_input" @@ -16,7 +17,8 @@ class SurveyUserInput(models.Model): def action_redirect_to_generated_records(self): self.ensure_one() - action = self.env['ir.actions.act_window']._for_xml_id('survey_record_generation.survey_generated_record_action') + action = self.env['ir.actions.act_window']._for_xml_id( + 'survey_record_generation.survey_generated_record_action') """ context = dict(self.env.context or {}) context['create'] = False @@ -31,71 +33,31 @@ class SurveyUserInput(models.Model): return action - def _mark_done(self, ignore_when_res_partner_mandatory_fields_are_missing = False): + def _mark_done(self, ignore_when_res_partner_mandatory_fields_are_missing=False): # generate records for user_input in self: created_records = {} fields_to_update = [] - for record_creation in user_input.survey_id.survey_record_creation_ids.sorted('sequence'): + for record_creation in user_input.survey_id.survey_record_creation_ids.sorted("sequence"): model = record_creation.model_id.model vals = {} - ModelClass = self.env[model] for field_value in record_creation.field_values_ids: - if field_value.value_origin == 'fixed': - vals[field_value.field_id.name] = field_value.get_fixed_value_for_record_creation() - elif field_value.value_origin == 'question': - # find user_input_lines of the question - user_input_lines = [user_input_line for user_input_line in user_input.user_input_line_ids if user_input_line.question_id == field_value.question_id] + field_name = field_value.field_id.name + value = None - if not user_input_lines: - continue - - if field_value.question_id.question_type in ['simple_choice', 'multiple_choice','matrix']: - if field_value.question_id.answer_values_type == 'record': - record_ids = [] - for user_input_line in user_input_lines: - if user_input_line.suggested_answer_id and user_input_line.suggested_answer_id.record_id: - record_ids.append(user_input_line.suggested_answer_id.record_id.id) - if field_value.question_id.question_type == 'simple_choice': - if record_ids: - vals[field_value.field_id.name] = record_ids[0] - else: - vals[field_value.field_id.name] = None - else: - vals[field_value.field_id.name] = record_ids - if field_value.question_id.answer_values_type == 'value': - if field_value.field_id.ttype == "boolean": - boolean_value = user_input_lines[0].suggested_answer_id.value_char in [True, 1, "1", "True", "true", "Oui", "oui", "Yes", "yes"] - vals[field_value.field_id.name] = boolean_value - else: - vals[field_value.field_id.name] = user_input_lines[0].suggested_answer_id.value_char - elif user_input_lines[0].answer_type: # if value not filled by user, answer_type not set - vals[field_value.field_id.name] = user_input_lines[0][f"value_{user_input_lines[0].answer_type}"] - else: - vals[field_value.field_id.name] = None - elif field_value.value_origin == 'other_record': + if field_value.value_origin == "fixed": + value = field_value.get_fixed_value_for_record_creation() + elif field_value.value_origin == "question": + value = self.get_value_from_user_answer(field_value, user_input) + elif field_value.value_origin == "other_record": fields_to_update.append(field_value) - # check if the field to update is mandatory - if ModelClass._fields[field_value.field_id.name].required: - # check if the other record is already created, if yes add it to vals - if len(created_records) > 0 and created_records[field_value.other_created_record_id.id]: - linked_record = created_records[field_value.other_created_record_id.id] - vals[field_value.field_id.name] = linked_record.id - else: - raise UserError( - _("The field %s is mandatory. In Record Creation tab, drag %s at the top of the table") - % (field_value.field_id.display_name, field_value.other_created_record_id.name) - ) - # check duplicates - uniq_fields = [field_value.field_id.name for field_value in record_creation.field_values_ids.filtered(lambda r:r.unicity_check)] - duplicate = None - if uniq_fields: - uniq_domain = [] - for uniq_field in uniq_fields: - uniq_domain.append((uniq_field,'=',vals[uniq_field])) - duplicate = self.env[model].search(uniq_domain, limit=1) + value = self.get_value_from_other_record(model, created_records, field_value, value) + + vals[field_name] = value + + duplicate = self.find_duplicate_if_there_are_fields_with_unicity_check(model, record_creation, vals) if duplicate: record = duplicate @@ -108,20 +70,105 @@ class SurveyUserInput(models.Model): # Create record record = self.env[model].create(vals) # Link generated records to user input - self.env['survey.generated.record'].create({ - 'survey_record_creation_name':record_creation.name, - 'survey_record_creation_id':record_creation.id, - 'user_input_id':user_input.id, - "created_record_id":"%s,%s" % (model,record.id) + self.env["survey.generated.record"].create({ + "survey_record_creation_name": record_creation.name, + "survey_record_creation_id": record_creation.id, + "user_input_id": user_input.id, + "created_record_id": "%s,%s" % (model, record.id) }) created_records[record_creation.id] = record # update linked records + # TODO : not covered by test, because I don't get the purpose + # TODO : maybe in case of duplicate ? for field_to_update in fields_to_update: record_to_update = created_records.get(field_to_update.survey_record_creation_id.id) if record_to_update: linked_record = created_records[field_to_update.other_created_record_id.id] - record_to_update.write({field_to_update.field_id.name:linked_record.id}) + record_to_update.write({field_to_update.field_id.name: linked_record.id}) return super()._mark_done() + + def find_duplicate_if_there_are_fields_with_unicity_check(self, model, record_creation, vals: dict[Any, Any]) -> Any: + # check duplicates + unique_fields = [field_value.field_id.name for field_value in + record_creation.field_values_ids.filtered(lambda r: r.unicity_check)] + duplicate = None + if unique_fields: + uniq_domain = [] + for uniq_field in unique_fields: + uniq_domain.append((uniq_field, '=', vals[uniq_field])) + duplicate = self.env[model].search(uniq_domain, limit=1) + return duplicate + + def get_value_from_other_record(self, model, created_records: dict[Any, Any], field_value, value) -> Any: + ModelClass = self.env[model] + # check if the field to update is mandatory + if ModelClass._fields[field_value.field_id.name].required: + # check if the other record is already created, if yes add it to vals + if len(created_records) > 0 and created_records[field_value.other_created_record_id.id]: + linked_record = created_records[field_value.other_created_record_id.id] + value = linked_record.id + else: + raise UserError( + _("The field %s is mandatory. In Record Creation tab, drag %s at the top of the table") + % (field_value.field_id.display_name, field_value.other_created_record_id.name) + ) + return value + + def get_value_from_user_answer(self, field_value, user_input) -> Any: + # find user_input_lines (which are user's answers) for the question + user_input_lines = [user_input_line for user_input_line in user_input.user_input_line_ids if + user_input_line.question_id == field_value.question_id] + + if not user_input_lines: + # If the question has not been displayed to the user, there are no user_input_lines + return None + if user_input_lines[0].skipped: + # The question has been ignored by the user + return None + + question_type = field_value.question_id.question_type + + if question_type in ["char_box", "text_box", "numerical_box", "date", "datetime"]: + return user_input_lines[0][f"value_{user_input_lines[0].answer_type}"] + elif question_type in ["simple_choice", "multiple_choice", "matrix"]: + answer_values_type = field_value.question_id.answer_values_type + return self.get_value_based_on_answer_values_type(answer_values_type, field_value, question_type, + user_input_lines) + else: + raise UserError( + "[Survey record generation] The question type %s is not recognized (for question %s)." % question_type, + field_value.question_id.title + ) + + def get_value_based_on_answer_values_type(self, answer_values_type, field_value, + question_type: Literal["simple_choice", "multiple_choice", "matrix"], + user_input_lines: list[Any]) -> Any: + if answer_values_type == "record": + answered_record_ids = [] + for user_input_line in user_input_lines: + if user_input_line.suggested_answer_id and user_input_line.suggested_answer_id.record_id: + answered_record_ids.append(user_input_line.suggested_answer_id.record_id.id) + if not answered_record_ids: + return None + if question_type == "simple_choice": + return answered_record_ids[0] + elif question_type == "multiple_choice": + return answered_record_ids + else: + raise UserError( + "[Survey record generation] The question type %s is not supported yet." % question_type) + elif answer_values_type == "value": + if field_value.field_id.ttype == "boolean": + boolean_value = user_input_lines[0].suggested_answer_id.value_char in [True, 1, "1", "True", "true", + "Oui", "oui", "Yes", "yes"] + return boolean_value + else: + return user_input_lines[0].suggested_answer_id.value_char + else: + raise UserError( + "[Survey record generation] The answer values type %s is not supported (for question %s)." % answer_values_type, + field_value.question_id.title + ) diff --git a/survey_record_generation/readme/CONTRIBUTORS.rst b/survey_record_generation/readme/CONTRIBUTORS.rst index ed90ebe..6aaf080 100644 --- a/survey_record_generation/readme/CONTRIBUTORS.rst +++ b/survey_record_generation/readme/CONTRIBUTORS.rst @@ -1,3 +1,4 @@ * `Elabore `_ - * Clément Thomas \ No newline at end of file + * Clément Thomas + * Quentin Mondot