[IMP] survey_record_generation: first refacto of record creation
This commit is contained in:
@@ -1,12 +1,14 @@
|
||||
|
||||
import logging
|
||||
import ast
|
||||
from typing import Literal
|
||||
|
||||
from odoo import api, fields, models, _, Command
|
||||
from odoo.exceptions import UserError
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
AnswerValuesType: Literal["no", "value", "record"]
|
||||
|
||||
class SurveyQuestion(models.Model):
|
||||
_inherit = 'survey.question'
|
||||
@@ -14,8 +16,8 @@ class SurveyQuestion(models.Model):
|
||||
model_id = fields.Many2one('ir.model', string="Model")
|
||||
model_name = fields.Char(related="model_id.model")
|
||||
fill_domain = fields.Char("Domain", default="[]")
|
||||
answer_values_type = fields.Selection([('no', 'No values'),('value','Value'),('record','Record')], string="Associate value to answer", default="no", required=True)
|
||||
|
||||
answer_values_type = fields.Selection([('no', 'No values'),('value','Value'),('record','Record')], string="Associate value to answer", default="no", required=True)
|
||||
|
||||
@api.onchange('model_id')
|
||||
def onchange_model_id(self):
|
||||
self.fill_domain = "[]"
|
||||
|
||||
@@ -1,13 +1,24 @@
|
||||
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
|
||||
from odoo import models, fields, _
|
||||
from typing import TYPE_CHECKING, Any, Literal
|
||||
|
||||
from odoo import _, fields, models
|
||||
from odoo.exceptions import UserError
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .survey_question import AnswerValuesType
|
||||
from .survey_record_creation import SurveyRecordCreation
|
||||
from .survey_record_creation_field_values import SurveyRecordCreationFieldValues
|
||||
|
||||
|
||||
class SurveyUserInput(models.Model):
|
||||
_inherit = "survey.user_input"
|
||||
|
||||
generated_record_ids = fields.One2many('survey.generated.record', 'user_input_id', 'Generated records')
|
||||
generated_records_count = fields.Integer("Attempts Count", compute='_compute_generated_records_count')
|
||||
generated_record_ids = fields.One2many(
|
||||
"survey.generated.record", "user_input_id", "Generated records"
|
||||
)
|
||||
generated_records_count = fields.Integer(
|
||||
"Attempts Count", compute="_compute_generated_records_count"
|
||||
)
|
||||
|
||||
def _compute_generated_records_count(self):
|
||||
for user_input in self:
|
||||
@@ -16,112 +27,257 @@ class SurveyUserInput(models.Model):
|
||||
def action_redirect_to_generated_records(self):
|
||||
self.ensure_one()
|
||||
|
||||
action = self.env['ir.actions.act_window']._for_xml_id('survey_record_generation.survey_generated_record_action')
|
||||
""" context = dict(self.env.context or {})
|
||||
|
||||
context['create'] = False
|
||||
context['search_default_survey_id'] = self.survey_id.id
|
||||
context['search_default_group_by_survey'] = False
|
||||
if self.partner_id:
|
||||
context['search_default_partner_id'] = self.partner_id.id
|
||||
elif self.email:
|
||||
context['search_default_email'] = self.email
|
||||
|
||||
action['context'] = context """
|
||||
action = self.env["ir.actions.act_window"]._for_xml_id(
|
||||
"survey_record_generation.survey_generated_record_action"
|
||||
)
|
||||
|
||||
return action
|
||||
|
||||
def _mark_done(self, ignore_when_res_partner_mandatory_fields_are_missing = False):
|
||||
def _mark_done(
|
||||
self, ignore_when_res_partner_mandatory_fields_are_missing: bool = False
|
||||
):
|
||||
# generate records
|
||||
for user_input in self:
|
||||
created_records = {}
|
||||
fields_to_update = []
|
||||
|
||||
for record_creation in user_input.survey_id.survey_record_creation_ids.sorted('sequence'):
|
||||
model = record_creation.model_id.model
|
||||
vals = {}
|
||||
ModelClass = self.env[model]
|
||||
record_creation: SurveyRecordCreation
|
||||
for (
|
||||
record_creation
|
||||
) in user_input.survey_id.survey_record_creation_ids.sorted("sequence"):
|
||||
model: str = record_creation.model_id.model
|
||||
vals: dict = {}
|
||||
|
||||
field_value: SurveyRecordCreationFieldValues
|
||||
for field_value in record_creation.field_values_ids:
|
||||
if field_value.value_origin == 'fixed':
|
||||
vals[field_value.field_id.name] = field_value.get_fixed_value_for_record_creation()
|
||||
elif field_value.value_origin == 'question':
|
||||
# find user_input_lines of the question
|
||||
user_input_lines = [user_input_line for user_input_line in user_input.user_input_line_ids if user_input_line.question_id == field_value.question_id]
|
||||
field_name: str = field_value.field_id.name
|
||||
value: Any = None
|
||||
|
||||
if not user_input_lines:
|
||||
continue
|
||||
|
||||
if field_value.question_id.question_type in ['simple_choice', 'multiple_choice','matrix']:
|
||||
if field_value.question_id.answer_values_type == 'record':
|
||||
record_ids = []
|
||||
for user_input_line in user_input_lines:
|
||||
if user_input_line.suggested_answer_id and user_input_line.suggested_answer_id.record_id:
|
||||
record_ids.append(user_input_line.suggested_answer_id.record_id.id)
|
||||
if field_value.question_id.question_type == 'simple_choice':
|
||||
if record_ids:
|
||||
vals[field_value.field_id.name] = record_ids[0]
|
||||
else:
|
||||
vals[field_value.field_id.name] = None
|
||||
else:
|
||||
vals[field_value.field_id.name] = record_ids
|
||||
if field_value.question_id.answer_values_type == 'value':
|
||||
if field_value.field_id.ttype == "boolean":
|
||||
boolean_value = user_input_lines[0].suggested_answer_id.value_char in [True, 1, "1", "True", "true", "Oui", "oui", "Yes", "yes"]
|
||||
vals[field_value.field_id.name] = boolean_value
|
||||
else:
|
||||
vals[field_value.field_id.name] = user_input_lines[0].suggested_answer_id.value_char
|
||||
elif user_input_lines[0].answer_type: # if value not filled by user, answer_type not set
|
||||
vals[field_value.field_id.name] = user_input_lines[0][f"value_{user_input_lines[0].answer_type}"]
|
||||
else:
|
||||
vals[field_value.field_id.name] = None
|
||||
elif field_value.value_origin == 'other_record':
|
||||
if field_value.value_origin == "fixed":
|
||||
value = field_value.get_fixed_value_for_record_creation()
|
||||
elif field_value.value_origin == "question":
|
||||
value = self.get_value_from_user_answer(field_value, user_input)
|
||||
elif field_value.value_origin == "other_record":
|
||||
fields_to_update.append(field_value)
|
||||
# check if the field to update is mandatory
|
||||
if ModelClass._fields[field_value.field_id.name].required:
|
||||
# check if the other record is already created, if yes add it to vals
|
||||
if len(created_records) > 0 and created_records[field_value.other_created_record_id.id]:
|
||||
linked_record = created_records[field_value.other_created_record_id.id]
|
||||
vals[field_value.field_id.name] = linked_record.id
|
||||
else:
|
||||
raise UserError(
|
||||
_("The field %s is mandatory. In Record Creation tab, drag %s at the top of the table")
|
||||
% (field_value.field_id.display_name, field_value.other_created_record_id.name)
|
||||
)
|
||||
# check duplicates
|
||||
uniq_fields = [field_value.field_id.name for field_value in record_creation.field_values_ids.filtered(lambda r:r.unicity_check)]
|
||||
duplicate = None
|
||||
if uniq_fields:
|
||||
uniq_domain = []
|
||||
for uniq_field in uniq_fields:
|
||||
uniq_domain.append((uniq_field,'=',vals[uniq_field]))
|
||||
duplicate = self.env[model].search(uniq_domain, limit=1)
|
||||
value = self.get_value_from_other_record(
|
||||
model, created_records, field_value
|
||||
)
|
||||
|
||||
vals[field_name] = value
|
||||
|
||||
duplicate = self.find_duplicate_if_there_are_fields_with_unicity_check(
|
||||
model, record_creation, vals
|
||||
)
|
||||
|
||||
if duplicate:
|
||||
record = duplicate
|
||||
else:
|
||||
if model == "res.partner" and ignore_when_res_partner_mandatory_fields_are_missing:
|
||||
# this part has been developed for Calim specific needs : being able to create several Contacts with the same survey
|
||||
# TODO : find a way to make it generic for all models ?
|
||||
if (
|
||||
model == "res.partner"
|
||||
and ignore_when_res_partner_mandatory_fields_are_missing
|
||||
):
|
||||
# this part has been developed for Calim specific needs :
|
||||
# being able to ignore some Contacts creation
|
||||
# TODO : find a way to make it generic for all models
|
||||
if not vals.get("lastname") and not vals.get("firstname"):
|
||||
continue
|
||||
# Create record
|
||||
record = self.env[model].create(vals)
|
||||
# Link generated records to user input
|
||||
self.env['survey.generated.record'].create({
|
||||
'survey_record_creation_name':record_creation.name,
|
||||
'survey_record_creation_id':record_creation.id,
|
||||
'user_input_id':user_input.id,
|
||||
"created_record_id":"%s,%s" % (model,record.id)
|
||||
})
|
||||
self.env["survey.generated.record"].create(
|
||||
{
|
||||
"survey_record_creation_name": record_creation.name,
|
||||
"survey_record_creation_id": record_creation.id,
|
||||
"user_input_id": user_input.id,
|
||||
"created_record_id": f"{model},{record.id}",
|
||||
}
|
||||
)
|
||||
|
||||
created_records[record_creation.id] = record
|
||||
|
||||
# update linked records
|
||||
# TODO : not covered by test, because I don't get the purpose
|
||||
# TODO : maybe in case of duplicate ?
|
||||
for field_to_update in fields_to_update:
|
||||
record_to_update = created_records.get(field_to_update.survey_record_creation_id.id)
|
||||
record_to_update = created_records.get(
|
||||
field_to_update.survey_record_creation_id.id
|
||||
)
|
||||
if record_to_update:
|
||||
linked_record = created_records[field_to_update.other_created_record_id.id]
|
||||
record_to_update.write({field_to_update.field_id.name:linked_record.id})
|
||||
linked_record = created_records[
|
||||
field_to_update.other_created_record_id.id
|
||||
]
|
||||
record_to_update.write(
|
||||
{field_to_update.field_id.name: linked_record.id}
|
||||
)
|
||||
|
||||
return super()._mark_done()
|
||||
|
||||
def find_duplicate_if_there_are_fields_with_unicity_check(
|
||||
self, model: str, record_creation: "SurveyRecordCreation", vals: dict[Any, Any]
|
||||
) -> Any:
|
||||
# check duplicates
|
||||
unique_fields = [
|
||||
field_value.field_id.name
|
||||
for field_value in record_creation.field_values_ids.filtered(
|
||||
lambda r: r.unicity_check
|
||||
)
|
||||
]
|
||||
duplicate = None
|
||||
if unique_fields:
|
||||
uniq_domain = []
|
||||
for uniq_field in unique_fields:
|
||||
uniq_domain.append((uniq_field, "=", vals[uniq_field]))
|
||||
duplicate = self.env[model].search(uniq_domain, limit=1)
|
||||
return duplicate
|
||||
|
||||
def get_value_from_other_record(
|
||||
self,
|
||||
model: str,
|
||||
created_records: dict[Any, Any],
|
||||
field_value: "SurveyRecordCreationFieldValues",
|
||||
) -> Any:
|
||||
model_class = self.env[model]
|
||||
# check if the field to update is mandatory
|
||||
if model_class._fields[field_value.field_id.name].required:
|
||||
# check if the other record is already created, if yes add it to vals
|
||||
if (
|
||||
len(created_records) > 0
|
||||
and created_records[field_value.other_created_record_id.id]
|
||||
):
|
||||
linked_record = created_records[field_value.other_created_record_id.id]
|
||||
return linked_record.id
|
||||
else:
|
||||
raise UserError(
|
||||
_(
|
||||
"The field %(field)s is mandatory. In Record Creation tab, "
|
||||
"drag %(record)s at the top of the table"
|
||||
)
|
||||
% {
|
||||
"field": field_value.field_id.display_name,
|
||||
"record": field_value.other_created_record_id.name,
|
||||
}
|
||||
)
|
||||
else:
|
||||
raise UserError(
|
||||
_(
|
||||
"[Survey record generation] Only required fields can be "
|
||||
"set up with 'other_record' "
|
||||
"(field %(field)s is not mandatory for model %(model)s)."
|
||||
)
|
||||
% {"field": field_value.field_id.name, "model": model}
|
||||
)
|
||||
|
||||
def get_value_from_user_answer(
|
||||
self,
|
||||
field_value: "SurveyRecordCreationFieldValues",
|
||||
user_input: "SurveyUserInput",
|
||||
) -> Any:
|
||||
# find user_input_lines (which are user's answers) for the question
|
||||
user_input_lines = [
|
||||
user_input_line
|
||||
for user_input_line in user_input.user_input_line_ids
|
||||
if user_input_line.question_id == field_value.question_id
|
||||
]
|
||||
|
||||
if not user_input_lines:
|
||||
# If the question has not been displayed to the user,
|
||||
# there are no user_input_lines
|
||||
return None
|
||||
if user_input_lines[0].skipped:
|
||||
# The question has been ignored by the user
|
||||
return None
|
||||
|
||||
question_type = field_value.question_id.question_type
|
||||
|
||||
if question_type in [
|
||||
"char_box",
|
||||
"text_box",
|
||||
"numerical_box",
|
||||
"date",
|
||||
"datetime",
|
||||
]:
|
||||
return user_input_lines[0][f"value_{user_input_lines[0].answer_type}"]
|
||||
elif question_type in ["simple_choice", "multiple_choice", "matrix"]:
|
||||
answer_values_type = field_value.question_id.answer_values_type
|
||||
return self.get_value_based_on_answer_values_type(
|
||||
answer_values_type, field_value, question_type, user_input_lines
|
||||
)
|
||||
else:
|
||||
raise UserError(
|
||||
_(
|
||||
"[Survey record generation] The question type %(type)s is not "
|
||||
"recognized (for question %(question)s)."
|
||||
)
|
||||
% {"type": question_type, "question": field_value.question_id.title}
|
||||
)
|
||||
|
||||
def get_value_based_on_answer_values_type(
|
||||
self,
|
||||
answer_values_type: "AnswerValuesType",
|
||||
field_value: "SurveyRecordCreationFieldValues",
|
||||
question_type: Literal["simple_choice", "multiple_choice", "matrix"],
|
||||
user_input_lines: list[Any],
|
||||
) -> Any:
|
||||
if answer_values_type == "record":
|
||||
answered_record_ids = []
|
||||
for user_input_line in user_input_lines:
|
||||
if (
|
||||
user_input_line.suggested_answer_id
|
||||
and user_input_line.suggested_answer_id.record_id
|
||||
):
|
||||
answered_record_ids.append(
|
||||
user_input_line.suggested_answer_id.record_id.id
|
||||
)
|
||||
if not answered_record_ids:
|
||||
return None
|
||||
if question_type == "simple_choice":
|
||||
return answered_record_ids[0]
|
||||
elif question_type == "multiple_choice":
|
||||
return answered_record_ids
|
||||
else:
|
||||
raise UserError(
|
||||
_(
|
||||
"[Survey record generation] The question type"
|
||||
" %(type)s is not supported yet."
|
||||
)
|
||||
% {"type": question_type}
|
||||
)
|
||||
elif answer_values_type == "value":
|
||||
if field_value.field_id.ttype != "boolean":
|
||||
return user_input_lines[0].suggested_answer_id.value_char
|
||||
else:
|
||||
answer_value_char = user_input_lines[0].suggested_answer_id.value_char
|
||||
if boolean_value := answer_value_char in [
|
||||
True,
|
||||
1,
|
||||
"1",
|
||||
"True",
|
||||
"true",
|
||||
"Oui",
|
||||
"oui",
|
||||
"Yes",
|
||||
"yes",
|
||||
]:
|
||||
return boolean_value
|
||||
else:
|
||||
raise UserError(
|
||||
_(
|
||||
"[Survey record generation] The boolean value %s(value)s "
|
||||
"is not supported (for question %(question)s)."
|
||||
)
|
||||
% {
|
||||
"value": answer_value_char,
|
||||
"question": field_value.question_id.title,
|
||||
}
|
||||
)
|
||||
else:
|
||||
raise UserError(
|
||||
_(
|
||||
"[Survey record generation] The answer values type %(type)s "
|
||||
"is not supported (for question %(question)s)."
|
||||
)
|
||||
% {
|
||||
"type": answer_values_type,
|
||||
"question": field_value.question_id.title,
|
||||
}
|
||||
)
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
* `Elabore <https://www.elabore.coop>`_
|
||||
|
||||
* Clément Thomas
|
||||
* Clément Thomas
|
||||
* Quentin Mondot
|
||||
|
||||
Reference in New Issue
Block a user