forked from sqlobject/sqlobject
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathviews.py
More file actions
151 lines (127 loc) · 5.74 KB
/
views.py
File metadata and controls
151 lines (127 loc) · 5.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from .main import SQLObject
from .sqlbuilder import AND, Alias, ColumnAS, LEFTJOINOn, \
NoDefault, SQLCall, SQLConstant, SQLObjectField, SQLObjectTable, SQLOp, \
Select, sqlrepr
class ViewSQLObjectField(SQLObjectField):
def __init__(self, alias, *arg):
SQLObjectField.__init__(self, *arg)
self.alias = alias
def __sqlrepr__(self, db):
return self.alias + "." + self.fieldName
def tablesUsedImmediate(self):
return [self.tableName]
class ViewSQLObjectTable(SQLObjectTable):
FieldClass = ViewSQLObjectField
def __getattr__(self, attr):
if attr == 'sqlmeta':
raise AttributeError
return SQLObjectTable.__getattr__(self, attr)
def _getattrFromID(self, attr):
return self.FieldClass(self.soClass.sqlmeta.alias, self.tableName,
'id', attr, self.soClass, None)
def _getattrFromColumn(self, column, attr):
return self.FieldClass(self.soClass.sqlmeta.alias, self.tableName,
column.name, attr, self.soClass, column)
class ViewSQLObject(SQLObject):
"""
A SQLObject class that derives all it's values from other SQLObject
classes. Columns on subclasses should use SQLBuilder constructs for dbName,
and sqlmeta should specify:
* idName as a SQLBuilder construction
* clause as SQLBuilder clause for specifying join conditions
or other restrictions
* table as an optional alternate name for the class alias
See test_views.py for simple examples.
"""
def __classinit__(cls, new_attrs):
SQLObject.__classinit__(cls, new_attrs)
# like is_base
if cls.__name__ != 'ViewSQLObject':
dbName = hasattr(cls, '_connection') and \
(cls._connection and cls._connection.dbName) or None
if getattr(cls.sqlmeta, 'table', None):
cls.sqlmeta.alias = cls.sqlmeta.table
else:
cls.sqlmeta.alias = \
cls.sqlmeta.style.pythonClassToDBTable(cls.__name__)
alias = cls.sqlmeta.alias
columns = [ColumnAS(cls.sqlmeta.idName, 'id')]
# {sqlrepr-key: [restriction, *aggregate-column]}
aggregates = {'': [None]}
inverseColumns = dict(
[(y, x) for x, y in cls.sqlmeta.columns.items()])
for col in cls.sqlmeta.columnList:
n = inverseColumns[col]
ascol = ColumnAS(col.dbName, n)
if isAggregate(col.dbName):
restriction = getattr(col, 'aggregateClause', None)
if restriction:
restrictkey = sqlrepr(restriction, dbName)
aggregates[restrictkey] = \
aggregates.get(restrictkey, [restriction]) + \
[ascol]
else:
aggregates[''].append(ascol)
else:
columns.append(ascol)
metajoin = getattr(cls.sqlmeta, 'join', NoDefault)
clause = getattr(cls.sqlmeta, 'clause', NoDefault)
select = Select(columns,
distinct=True,
# @@ LDO check if this really mattered
# for performance
# @@ Postgres (and MySQL?) extension!
# distinctOn=cls.sqlmeta.idName,
join=metajoin,
clause=clause)
aggregates = aggregates.values()
if aggregates != [[None]]:
join = []
last_alias = "%s_base" % alias
last_id = "id"
last = Alias(select, last_alias)
columns = [
ColumnAS(SQLConstant("%s.%s" % (last_alias, x.expr2)),
x.expr2) for x in columns]
for i, agg in enumerate(aggregates):
restriction = agg[0]
if restriction is None:
restriction = clause
else:
restriction = AND(clause, restriction)
agg = agg[1:]
agg_alias = "%s_%s" % (alias, i)
agg_id = '%s_id' % agg_alias
if not last.q.alias.endswith('base'):
last = None
new_alias = Alias(Select(
[ColumnAS(cls.sqlmeta.idName, agg_id)] + agg,
groupBy=cls.sqlmeta.idName,
join=metajoin,
clause=restriction),
agg_alias)
agg_join = LEFTJOINOn(last, new_alias,
"%s.%s = %s.%s" % (
last_alias, last_id,
agg_alias, agg_id))
join.append(agg_join)
for col in agg:
columns.append(
ColumnAS(SQLConstant(
"%s.%s" % (agg_alias, col.expr2)),
col.expr2))
last = new_alias
last_alias = agg_alias
last_id = agg_id
select = Select(columns,
join=join)
cls.sqlmeta.table = Alias(select, alias)
cls.q = ViewSQLObjectTable(cls)
for n, col in cls.sqlmeta.columns.items():
col.dbName = n
def isAggregate(expr):
if isinstance(expr, SQLCall):
return True
if isinstance(expr, SQLOp):
return isAggregate(expr.expr1) or isAggregate(expr.expr2)
return False