[IMP] survey_record_generation: first refacto of record creation
This commit is contained in:
@@ -1,12 +1,14 @@
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
import ast
|
import ast
|
||||||
|
from typing import Literal
|
||||||
|
|
||||||
from odoo import api, fields, models, _, Command
|
from odoo import api, fields, models, _, Command
|
||||||
from odoo.exceptions import UserError
|
from odoo.exceptions import UserError
|
||||||
|
|
||||||
_logger = logging.getLogger(__name__)
|
_logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
AnswerValuesType: Literal["no", "value", "record"]
|
||||||
|
|
||||||
class SurveyQuestion(models.Model):
|
class SurveyQuestion(models.Model):
|
||||||
_inherit = 'survey.question'
|
_inherit = 'survey.question'
|
||||||
|
|||||||
@@ -1,8 +1,9 @@
|
|||||||
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
|
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
|
||||||
|
from typing import Any, Literal
|
||||||
|
|
||||||
from odoo import models, fields, _
|
from odoo import models, fields, _
|
||||||
from odoo.exceptions import UserError
|
from odoo.exceptions import UserError
|
||||||
|
|
||||||
|
|
||||||
class SurveyUserInput(models.Model):
|
class SurveyUserInput(models.Model):
|
||||||
_inherit = "survey.user_input"
|
_inherit = "survey.user_input"
|
||||||
|
|
||||||
@@ -16,7 +17,8 @@ class SurveyUserInput(models.Model):
|
|||||||
def action_redirect_to_generated_records(self):
|
def action_redirect_to_generated_records(self):
|
||||||
self.ensure_one()
|
self.ensure_one()
|
||||||
|
|
||||||
action = self.env['ir.actions.act_window']._for_xml_id('survey_record_generation.survey_generated_record_action')
|
action = self.env['ir.actions.act_window']._for_xml_id(
|
||||||
|
'survey_record_generation.survey_generated_record_action')
|
||||||
""" context = dict(self.env.context or {})
|
""" context = dict(self.env.context or {})
|
||||||
|
|
||||||
context['create'] = False
|
context['create'] = False
|
||||||
@@ -31,71 +33,31 @@ class SurveyUserInput(models.Model):
|
|||||||
|
|
||||||
return action
|
return action
|
||||||
|
|
||||||
def _mark_done(self, ignore_when_res_partner_mandatory_fields_are_missing = False):
|
def _mark_done(self, ignore_when_res_partner_mandatory_fields_are_missing=False):
|
||||||
# generate records
|
# generate records
|
||||||
for user_input in self:
|
for user_input in self:
|
||||||
created_records = {}
|
created_records = {}
|
||||||
fields_to_update = []
|
fields_to_update = []
|
||||||
|
|
||||||
for record_creation in user_input.survey_id.survey_record_creation_ids.sorted('sequence'):
|
for record_creation in user_input.survey_id.survey_record_creation_ids.sorted("sequence"):
|
||||||
model = record_creation.model_id.model
|
model = record_creation.model_id.model
|
||||||
vals = {}
|
vals = {}
|
||||||
ModelClass = self.env[model]
|
|
||||||
|
|
||||||
for field_value in record_creation.field_values_ids:
|
for field_value in record_creation.field_values_ids:
|
||||||
if field_value.value_origin == 'fixed':
|
field_name = field_value.field_id.name
|
||||||
vals[field_value.field_id.name] = field_value.get_fixed_value_for_record_creation()
|
value = None
|
||||||
elif field_value.value_origin == 'question':
|
|
||||||
# find user_input_lines of the question
|
|
||||||
user_input_lines = [user_input_line for user_input_line in user_input.user_input_line_ids if user_input_line.question_id == field_value.question_id]
|
|
||||||
|
|
||||||
if not user_input_lines:
|
if field_value.value_origin == "fixed":
|
||||||
continue
|
value = field_value.get_fixed_value_for_record_creation()
|
||||||
|
elif field_value.value_origin == "question":
|
||||||
if field_value.question_id.question_type in ['simple_choice', 'multiple_choice','matrix']:
|
value = self.get_value_from_user_answer(field_value, user_input)
|
||||||
if field_value.question_id.answer_values_type == 'record':
|
elif field_value.value_origin == "other_record":
|
||||||
record_ids = []
|
|
||||||
for user_input_line in user_input_lines:
|
|
||||||
if user_input_line.suggested_answer_id and user_input_line.suggested_answer_id.record_id:
|
|
||||||
record_ids.append(user_input_line.suggested_answer_id.record_id.id)
|
|
||||||
if field_value.question_id.question_type == 'simple_choice':
|
|
||||||
if record_ids:
|
|
||||||
vals[field_value.field_id.name] = record_ids[0]
|
|
||||||
else:
|
|
||||||
vals[field_value.field_id.name] = None
|
|
||||||
else:
|
|
||||||
vals[field_value.field_id.name] = record_ids
|
|
||||||
if field_value.question_id.answer_values_type == 'value':
|
|
||||||
if field_value.field_id.ttype == "boolean":
|
|
||||||
boolean_value = user_input_lines[0].suggested_answer_id.value_char in [True, 1, "1", "True", "true", "Oui", "oui", "Yes", "yes"]
|
|
||||||
vals[field_value.field_id.name] = boolean_value
|
|
||||||
else:
|
|
||||||
vals[field_value.field_id.name] = user_input_lines[0].suggested_answer_id.value_char
|
|
||||||
elif user_input_lines[0].answer_type: # if value not filled by user, answer_type not set
|
|
||||||
vals[field_value.field_id.name] = user_input_lines[0][f"value_{user_input_lines[0].answer_type}"]
|
|
||||||
else:
|
|
||||||
vals[field_value.field_id.name] = None
|
|
||||||
elif field_value.value_origin == 'other_record':
|
|
||||||
fields_to_update.append(field_value)
|
fields_to_update.append(field_value)
|
||||||
# check if the field to update is mandatory
|
value = self.get_value_from_other_record(model, created_records, field_value, value)
|
||||||
if ModelClass._fields[field_value.field_id.name].required:
|
|
||||||
# check if the other record is already created, if yes add it to vals
|
vals[field_name] = value
|
||||||
if len(created_records) > 0 and created_records[field_value.other_created_record_id.id]:
|
|
||||||
linked_record = created_records[field_value.other_created_record_id.id]
|
duplicate = self.find_duplicate_if_there_are_fields_with_unicity_check(model, record_creation, vals)
|
||||||
vals[field_value.field_id.name] = linked_record.id
|
|
||||||
else:
|
|
||||||
raise UserError(
|
|
||||||
_("The field %s is mandatory. In Record Creation tab, drag %s at the top of the table")
|
|
||||||
% (field_value.field_id.display_name, field_value.other_created_record_id.name)
|
|
||||||
)
|
|
||||||
# check duplicates
|
|
||||||
uniq_fields = [field_value.field_id.name for field_value in record_creation.field_values_ids.filtered(lambda r:r.unicity_check)]
|
|
||||||
duplicate = None
|
|
||||||
if uniq_fields:
|
|
||||||
uniq_domain = []
|
|
||||||
for uniq_field in uniq_fields:
|
|
||||||
uniq_domain.append((uniq_field,'=',vals[uniq_field]))
|
|
||||||
duplicate = self.env[model].search(uniq_domain, limit=1)
|
|
||||||
|
|
||||||
if duplicate:
|
if duplicate:
|
||||||
record = duplicate
|
record = duplicate
|
||||||
@@ -108,20 +70,105 @@ class SurveyUserInput(models.Model):
|
|||||||
# Create record
|
# Create record
|
||||||
record = self.env[model].create(vals)
|
record = self.env[model].create(vals)
|
||||||
# Link generated records to user input
|
# Link generated records to user input
|
||||||
self.env['survey.generated.record'].create({
|
self.env["survey.generated.record"].create({
|
||||||
'survey_record_creation_name':record_creation.name,
|
"survey_record_creation_name": record_creation.name,
|
||||||
'survey_record_creation_id':record_creation.id,
|
"survey_record_creation_id": record_creation.id,
|
||||||
'user_input_id':user_input.id,
|
"user_input_id": user_input.id,
|
||||||
"created_record_id":"%s,%s" % (model,record.id)
|
"created_record_id": "%s,%s" % (model, record.id)
|
||||||
})
|
})
|
||||||
|
|
||||||
created_records[record_creation.id] = record
|
created_records[record_creation.id] = record
|
||||||
|
|
||||||
# update linked records
|
# update linked records
|
||||||
|
# TODO : not covered by test, because I don't get the purpose
|
||||||
|
# TODO : maybe in case of duplicate ?
|
||||||
for field_to_update in fields_to_update:
|
for field_to_update in fields_to_update:
|
||||||
record_to_update = created_records.get(field_to_update.survey_record_creation_id.id)
|
record_to_update = created_records.get(field_to_update.survey_record_creation_id.id)
|
||||||
if record_to_update:
|
if record_to_update:
|
||||||
linked_record = created_records[field_to_update.other_created_record_id.id]
|
linked_record = created_records[field_to_update.other_created_record_id.id]
|
||||||
record_to_update.write({field_to_update.field_id.name:linked_record.id})
|
record_to_update.write({field_to_update.field_id.name: linked_record.id})
|
||||||
|
|
||||||
return super()._mark_done()
|
return super()._mark_done()
|
||||||
|
|
||||||
|
def find_duplicate_if_there_are_fields_with_unicity_check(self, model, record_creation, vals: dict[Any, Any]) -> Any:
|
||||||
|
# check duplicates
|
||||||
|
unique_fields = [field_value.field_id.name for field_value in
|
||||||
|
record_creation.field_values_ids.filtered(lambda r: r.unicity_check)]
|
||||||
|
duplicate = None
|
||||||
|
if unique_fields:
|
||||||
|
uniq_domain = []
|
||||||
|
for uniq_field in unique_fields:
|
||||||
|
uniq_domain.append((uniq_field, '=', vals[uniq_field]))
|
||||||
|
duplicate = self.env[model].search(uniq_domain, limit=1)
|
||||||
|
return duplicate
|
||||||
|
|
||||||
|
def get_value_from_other_record(self, model, created_records: dict[Any, Any], field_value, value) -> Any:
|
||||||
|
ModelClass = self.env[model]
|
||||||
|
# check if the field to update is mandatory
|
||||||
|
if ModelClass._fields[field_value.field_id.name].required:
|
||||||
|
# check if the other record is already created, if yes add it to vals
|
||||||
|
if len(created_records) > 0 and created_records[field_value.other_created_record_id.id]:
|
||||||
|
linked_record = created_records[field_value.other_created_record_id.id]
|
||||||
|
value = linked_record.id
|
||||||
|
else:
|
||||||
|
raise UserError(
|
||||||
|
_("The field %s is mandatory. In Record Creation tab, drag %s at the top of the table")
|
||||||
|
% (field_value.field_id.display_name, field_value.other_created_record_id.name)
|
||||||
|
)
|
||||||
|
return value
|
||||||
|
|
||||||
|
def get_value_from_user_answer(self, field_value, user_input) -> Any:
|
||||||
|
# find user_input_lines (which are user's answers) for the question
|
||||||
|
user_input_lines = [user_input_line for user_input_line in user_input.user_input_line_ids if
|
||||||
|
user_input_line.question_id == field_value.question_id]
|
||||||
|
|
||||||
|
if not user_input_lines:
|
||||||
|
# If the question has not been displayed to the user, there are no user_input_lines
|
||||||
|
return None
|
||||||
|
if user_input_lines[0].skipped:
|
||||||
|
# The question has been ignored by the user
|
||||||
|
return None
|
||||||
|
|
||||||
|
question_type = field_value.question_id.question_type
|
||||||
|
|
||||||
|
if question_type in ["char_box", "text_box", "numerical_box", "date", "datetime"]:
|
||||||
|
return user_input_lines[0][f"value_{user_input_lines[0].answer_type}"]
|
||||||
|
elif question_type in ["simple_choice", "multiple_choice", "matrix"]:
|
||||||
|
answer_values_type = field_value.question_id.answer_values_type
|
||||||
|
return self.get_value_based_on_answer_values_type(answer_values_type, field_value, question_type,
|
||||||
|
user_input_lines)
|
||||||
|
else:
|
||||||
|
raise UserError(
|
||||||
|
"[Survey record generation] The question type %s is not recognized (for question %s)." % question_type,
|
||||||
|
field_value.question_id.title
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_value_based_on_answer_values_type(self, answer_values_type, field_value,
|
||||||
|
question_type: Literal["simple_choice", "multiple_choice", "matrix"],
|
||||||
|
user_input_lines: list[Any]) -> Any:
|
||||||
|
if answer_values_type == "record":
|
||||||
|
answered_record_ids = []
|
||||||
|
for user_input_line in user_input_lines:
|
||||||
|
if user_input_line.suggested_answer_id and user_input_line.suggested_answer_id.record_id:
|
||||||
|
answered_record_ids.append(user_input_line.suggested_answer_id.record_id.id)
|
||||||
|
if not answered_record_ids:
|
||||||
|
return None
|
||||||
|
if question_type == "simple_choice":
|
||||||
|
return answered_record_ids[0]
|
||||||
|
elif question_type == "multiple_choice":
|
||||||
|
return answered_record_ids
|
||||||
|
else:
|
||||||
|
raise UserError(
|
||||||
|
"[Survey record generation] The question type %s is not supported yet." % question_type)
|
||||||
|
elif answer_values_type == "value":
|
||||||
|
if field_value.field_id.ttype == "boolean":
|
||||||
|
boolean_value = user_input_lines[0].suggested_answer_id.value_char in [True, 1, "1", "True", "true",
|
||||||
|
"Oui", "oui", "Yes", "yes"]
|
||||||
|
return boolean_value
|
||||||
|
else:
|
||||||
|
return user_input_lines[0].suggested_answer_id.value_char
|
||||||
|
else:
|
||||||
|
raise UserError(
|
||||||
|
"[Survey record generation] The answer values type %s is not supported (for question %s)." % answer_values_type,
|
||||||
|
field_value.question_id.title
|
||||||
|
)
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
* `Elabore <https://www.elabore.coop>`_
|
* `Elabore <https://www.elabore.coop>`_
|
||||||
|
|
||||||
* Clément Thomas
|
* Clément Thomas
|
||||||
|
* Quentin Mondot
|
||||||
|
|||||||
Reference in New Issue
Block a user