-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathpg_python.py
More file actions
114 lines (106 loc) · 2.82 KB
/
pg_python.py
File metadata and controls
114 lines (106 loc) · 2.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
##
# copyright 2009, James William Pye
# http://python.projects.postgresql.org
##
"""
Python command with a PG-API connection(``db``).
"""
import os
import sys
import re
import code
import optparse
import contextlib
from .. import clientparameters
from ..resolved import pythoncommand as pycmd
from .. import __version__
from ..driver import default as pg_driver
from .. import exceptions as pg_exc
pq_trace = optparse.make_option(
'--pq-trace',
dest = 'pq_trace',
help = 'trace PQ protocol transmissions',
default = None,
)
default_options = [
pq_trace,
] + pycmd.default_optparse_options
def command(argv = sys.argv):
p = clientparameters.DefaultParser(
"%prog [connection options] [script] ...",
version = __version__,
option_list = default_options
)
p.disable_interspersed_args()
co, ca = p.parse_args(argv[1:])
need_prompt = False
cond = None
connector = None
connection = None
while connection is None:
try:
cond = clientparameters.collect(parsed_options = co, prompt_title = None)
if need_prompt:
# authspec error thrown last time, so force prompt.
cond['prompt_password'] = True
try:
clientparameters.resolve_password(cond, prompt_title = 'pg_python')
except EOFError:
raise SystemExit(1)
connector = pg_driver.fit(**cond)
connection = connector()
connection.connect()
except pg_exc.ClientCannotConnectError as err:
for (didssl, sc, cf) in err.connection_failures:
if isinstance(cf, pg_exc.AuthenticationSpecificationError):
sys.stderr.write(os.linesep + cf.message + (os.linesep*2))
# keep prompting the user
need_prompt = True
break
else:
# no invalid password failures..
raise
pythonexec = pycmd.Execution(ca,
context = getattr(co, 'python_context', None),
loader = getattr(co, 'python_main', None),
)
builtin_overload = {
# New built-ins
'connector' : connector,
'db' : connection,
'prepare' : connection.prepare,
'execute' : connection.execute,
'settings' : connection.settings,
'proc' : connection.proc,
'xact' : connection.xact,
}
if not isinstance(__builtins__, dict):
builtins_d = __builtins__.__dict__
else:
builtins_d = __builtins__
restore = {k : builtins_d.get(k) for k in builtin_overload}
trace_file = None
if co.pq_trace is not None:
trace_file = open(co.pq_trace, 'a')
builtins_d.update(builtin_overload)
try:
if trace_file is not None:
connection.tracer = trace_file.write
context = [connection]
with contextlib.nested(*context):
rv = pythonexec(
context = pycmd.postmortem(os.environ.get('PYTHON_POSTMORTEM'))
)
finally:
# restore __builtins__
builtins_d.update(restore)
for k, v in builtin_overload.items():
if v is None:
del builtins_d[x]
if trace_file is not None:
trace_file.close()
return rv
if __name__ == '__main__':
sys.exit(command(sys.argv))
##
# vim: ts=3:sw=3:noet: