主要包括以下几个方法及主要用途:
search(): 搜索视图中调用
search_count(): 视图中计算记录数时调用
name_search(): many2one字段搜索时调用
search_read(): many2one点开搜索更多时调用
read_group(): 搜索视图分组时调用
search()
search方法中包含有几个子方法
根据domian取查询满组条件的数据记录
active字段的特殊用法,使用active_test=False来规避
count属性可以直接进行计数统计,而不需要search_count
_uniquify_list方法会将ids进行去重,也就是说id相同时,查到的就只有一条
search_count()
根据search的结果进行统计计数,如果要计数,可以直接在search的时候加上count=true属性,这样计算更快。
@api.model
def search_read(self, domain=None, fields=None, offset=0, limit=None, order=None):
"""
Performs a ``search()`` followed by a ``read()``. 先search再read
:param domain: Search domain, see ``args`` parameter in ``search()``. Defaults to an empty domain that will match all records.
查询条件,条件为空默认查询全部
:param fields: List of fields to read, see ``fields`` parameter in ``read()``. Defaults to all fields.
查询的字段,默认全部字段
:param offset: Number of records to skip, see ``offset`` parameter in ``search()``. Defaults to 0.
跳过的数据量
:param limit: Maximum number of records to return, see ``limit`` parameter in ``search()``. Defaults to no limit.
查询的条数
:param order: Columns to sort result, see ``order`` parameter in ``search()``. Defaults to no sort.
排序条件
:return: List of dictionaries containing the asked fields.
返回字段的字典值列表
:rtype: List of dictionaries. 包含字典的列表
"""
records = self.search(domain or [], offset=offset, limit=limit, order=order)
if not records:
return []
if fields and fields == ['id']:
# shortcut read if we only want the ids
return [{'id': record.id} for record in records]
# read() ignores active_test, but it would forward it to any downstream search call
# (e.g. for x2m or function fields), and this is not the desired behavior, the flag
# was presumably only meant for the main search().
# read()忽略active_test,但它会将其转发到任何下游搜索调用(例如,对于x2m或函数字,
# 这不是期望的行为,这个标志可能只针对主搜索()。
# TODO: Move this to read() directly?
if 'active_test' in self._context:
context = dict(self._context)
del context['active_test']
records = records.with_context(context)
result = records.read(fields)
if len(result) <= 1:
return result
# reorder read
index = {vals['id']: vals for vals in result}
return [index[record.id] for record in records if record.id in index]
name_search()
也是通过调用_search方法来进行查询的
@api.model
def name_search(self, name='', args=None, operator='ilike', limit=100):
""" name_search(name='', args=None, operator='ilike', limit=100) -> records
Search for records that have a display name matching the given
``name`` pattern when compared with the given ``operator``, while also
matching the optional search domain (``args``).
搜索具有与给定名称匹配的显示名称的记录“name”模式与给定的“operator”模式进行比较时,同时也是匹配可选搜索域(' ' args ' ')。
This is used for example to provide suggestions based on a partial
value for a relational field. Sometimes be seen as the inverse
function of :meth:`~.name_get`, but it is not guaranteed to be.
例如,它用于根据部分内容提供建议关系字段的值。有时被看作是相反的的函数:' ~.name_get ',但它不能保证是。
This method is equivalent to calling :meth:`~.search` with a search
domain based on ``display_name`` and then :meth:`~.name_get` on the
result of the search.
这个方法相当于调用:meth:' ~.search来搜索的域名,然后是:meth: ' ~.name_get '搜索结果。
:param str name: the name pattern to match 用于搜索的名称
:param list args: optional search domain (see :meth:`~.search` for
syntax), specifying further restrictions 搜索条件
:param str operator: domain operator for matching ``name``, such as
``'like'`` or ``'='``. 条件:like、=
:param int limit: optional max number of records to return 查询的条数
:rtype: list
:return: list of pairs ``(id, text_repr)`` for all matching records.
"""
return self._name_search(name, args, operator, limit=limit)
@api.model
def _name_search(self, name='', args=None, operator='ilike', limit=100, name_get_uid=None):
# private implementation of name_search, allows passing a dedicated user
# for the name_get part to solve some access rights issues
args = list(args or [])
# optimize out the default criterion of ``ilike ''`` that matches everything
if not self._rec_name:
_logger.warning("Cannot execute name_search, no _rec_name defined on %s", self._name)
elif not (name == '' and operator == 'ilike'):
args += [(self._rec_name, operator, name)]
access_rights_uid = name_get_uid or self._uid
ids = self._search(args, limit=limit, access_rights_uid=access_rights_uid)
recs = self.browse(ids)
return lazy_name_get(recs.sudo(access_rights_uid))
read_group()
数据分组时使用
@api.model
def read_group(self, domain, fields, groupby, offset=0, limit=None, orderby=False, lazy=True):
"""
Get the list of records in list view grouped by the given ``groupby`` fields
获取列表视图中按给定的“groupby”字段分组的记录列表
:param domain: list specifying search criteria [['field_name', 'operator', 'value'], ...]
domain条件
:param list fields: list of fields present in the list view specified on the object.
Each element is either 'field' (field name, using the default aggregation),
or 'field:agg' (aggregate field with aggregation function 'agg'),
or 'name:agg(field)' (aggregate field with 'agg' and return it as 'name').
The possible aggregation functions are the ones provided by PostgreSQL
(https://www.postgresql.org/docs/current/static/functions-aggregate.html)
and 'count_distinct', with the expected meaning.
分组之后显示的字段
:param list groupby: list of groupby descriptions by which the records will be grouped.
A groupby description is either a field (then it will be grouped by that field)
or a string 'field:groupby_function'. Right now, the only functions supported
are 'day', 'week', 'month', 'quarter' or 'year', and they only make sense for
date/datetime fields.
分组条件
:param int offset: optional number of records to skip
跳过多少查询记录
:param int limit: optional max number of records to return
返回的记录数
:param list orderby: optional ``order by`` specification, for
overriding the natural sort ordering of the
groups, see also :py:meth:`~osv.osv.osv.search`
(supported only for many2one fields currently)
排序条件
:param bool lazy: if true, the results are only grouped by the first groupby and the
remaining groupbys are put in the __context key. If false, all the groupbys are
done in one call.
是否弃用懒加载:如果为真,则结果仅按第一个groupby和其余的组放在__context键中。如果为假,则所有组都是一次调用搞定。
:return: list of dictionaries(one dictionary for each record) containing:
* the values of fields grouped by the fields in ``groupby`` argument
* __domain: list of tuples specifying the search criteria
* __context: dictionary with argument like ``groupby``
:rtype: [{'field_name_1': value, ...]
:raise AccessError: * if user has no read rights on the requested object
* if user tries to bypass access rules for read on the requested object
"""
result = self._read_group_raw(domain, fields, groupby, offset=offset, limit=limit, orderby=orderby, lazy=lazy)
groupby = [groupby] if isinstance(groupby, pycompat.string_types) else list(OrderedSet(groupby))
dt = [
f for f in groupby
if self._fields[f.split(':')[0]].type in ('date', 'datetime') # e.g. 'date:month'
]
# iterate on all results and replace the "full" date/datetime value
# (range, label) by just the formatted label, in-place
for group in result:
for df in dt:
# could group on a date(time) field which is empty in some
# records, in which case as with m2o the _raw value will be
# `False` instead of a (value, label) pair. In that case,
# leave the `False` value alone
if group.get(df):
group[df] = group[df][1]
return result
@api.model
def _read_group_raw(self, domain, fields, groupby, offset=0, limit=None, orderby=False, lazy=True):
self.check_access_rights('read')
# domian解析为sql查询语句
query = self._where_calc(domain)
# 拿出存储数据库的字段
fields = fields or [f.name for f in self._fields.values() if f.store]
groupby = [groupby] if isinstance(groupby, pycompat.string_types) else list(OrderedSet(groupby))
groupby_list = groupby[:1] if lazy else groupby
annotated_groupbys = [self._read_group_process_groupby(gb, query) for gb in groupby_list]
groupby_fields = [g['field'] for g in annotated_groupbys]
order = orderby or ','.join([g for g in groupby_list])
groupby_dict = {gb['groupby']: gb for gb in annotated_groupbys}
self._apply_ir_rules(query, 'read')
for gb in groupby_fields:
assert gb in self._fields, "Unknown field %r in 'groupby'" % gb
gb_field = self._fields[gb].base_field
assert gb_field.store and gb_field.column_type, "Fields in 'groupby' must be regular database-persisted fields (no function or related fields), or function fields with store=True"
aggregated_fields = []
select_terms = []
for fspec in fields:
if fspec == 'sequence':
continue
match = regex_field_agg.match(fspec)
if not match:
raise UserError(_("Invalid field specification %r.") % fspec)
name, func, fname = match.groups()
if func:
# we have either 'name:func' or 'name:func(fname)'
fname = fname or name
field = self._fields[fname]
if not (field.base_field.store and field.base_field.column_type):
raise UserError(_("Cannot aggregate field %r.") % fname)
if not func.isidentifier():
raise UserError(_("Invalid aggregation function %r.") % func)
else:
# we have 'name', retrieve the aggregator on the field
field = self._fields.get(name)
if not (field and field.base_field.store and
field.base_field.column_type and field.group_operator):
continue
func, fname = field.group_operator, name
if fname in groupby_fields:
continue
if name in aggregated_fields:
raise UserError(_("Output name %r is used twice.") % name)
aggregated_fields.append(name)
expr = self._inherits_join_calc(self._table, fname, query)
if func.lower() == 'count_distinct':
term = 'COUNT(DISTINCT %s) AS "%s"' % (expr, name)
else:
term = '%s(%s) AS "%s"' % (func, expr, name)
select_terms.append(term)
for gb in annotated_groupbys:
select_terms.append('%s as "%s" ' % (gb['qualified_field'], gb['groupby']))
groupby_terms, orderby_terms = self._read_group_prepare(order, aggregated_fields, annotated_groupbys, query)
from_clause, where_clause, where_clause_params = query.get_sql()
if lazy and (len(groupby_fields) >= 2 or not self._context.get('group_by_no_leaf')):
count_field = groupby_fields[0] if len(groupby_fields) >= 1 else '_'
else:
count_field = '_'
count_field += '_count'
prefix_terms = lambda prefix, terms: (prefix + " " + ",".join(terms)) if terms else ''
prefix_term = lambda prefix, term: ('%s %s' % (prefix, term)) if term else ''
query = """
SELECT min("%(table)s".id) AS id, count("%(table)s".id) AS "%(count_field)s" %(extra_fields)s
FROM %(from)s
%(where)s
%(groupby)s
%(orderby)s
%(limit)s
%(offset)s
""" % {
'table': self._table,
'count_field': count_field,
'extra_fields': prefix_terms(',', select_terms),
'from': from_clause,
'where': prefix_term('WHERE', where_clause),
'groupby': prefix_terms('GROUP BY', groupby_terms),
'orderby': prefix_terms('ORDER BY', orderby_terms),
'limit': prefix_term('LIMIT', int(limit) if limit else None),
'offset': prefix_term('OFFSET', int(offset) if limit else None),
}
self._cr.execute(query, where_clause_params)
fetched_data = self._cr.dictfetchall()
if not groupby_fields:
return fetched_data
self._read_group_resolve_many2one_fields(fetched_data, annotated_groupbys)
data = [{k: self._read_group_prepare_data(k, v, groupby_dict) for k, v in r.items()} for r in fetched_data]
if self.env.context.get('fill_temporal') and data:
data = self._read_group_fill_temporal(data, groupby, aggregated_fields,
annotated_groupbys)
result = [self._read_group_format_result(d, annotated_groupbys, groupby, domain) for d in data]
if lazy:
# Right now, read_group only fill results in lazy mode (by default).
# If you need to have the empty groups in 'eager' mode, then the
# method _read_group_fill_results need to be completely reimplemented
# in a sane way
result = self._read_group_fill_results(
domain, groupby_fields[0], groupby[len(annotated_groupbys):],
aggregated_fields, count_field, result, read_group_order=order,
)
return result
原文链接:https://blog.csdn.net/tsoTeo/article/details/105728776