-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathpg_python.py
More file actions
136 lines (125 loc) · 3.41 KB
/
pg_python.py
File metadata and controls
136 lines (125 loc) · 3.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
##
# .bin.pg_python - Python console with a connection.
##
"""
Python command with a PG-API connection(``db``).
"""
import os
import sys
import re
import code
import optparse
import contextlib
from .. import clientparameters
from ..python import command as pycmd
from .. import project
from ..driver import default as pg_driver
from .. import exceptions as pg_exc
from .. import sys as pg_sys
from .. import lib as pg_lib
pq_trace = optparse.make_option(
'--pq-trace',
dest = 'pq_trace',
help = 'trace PQ protocol transmissions',
default = None,
)
default_options = [
pq_trace,
clientparameters.option_lib,
clientparameters.option_libpath,
] + pycmd.default_optparse_options
def command(argv = sys.argv):
p = clientparameters.DefaultParser(
"%prog [connection options] [script] ...",
version = project.version,
option_list = default_options
)
p.disable_interspersed_args()
co, ca = p.parse_args(argv[1:])
rv = 1
# Resolve the category.
pg_sys.libpath.insert(0, os.path.curdir)
pg_sys.libpath.extend(co.libpath or [])
if co.lib:
cat = pg_lib.Category(*map(pg_lib.load, co.lib))
else:
cat = None
trace_file = None
if co.pq_trace is not None:
trace_file = open(co.pq_trace, 'a')
try:
need_prompt = False
cond = None
connector = None
connection = None
while connection is None:
try:
cond = clientparameters.collect(parsed_options = co, prompt_title = None)
if need_prompt:
# authspec error thrown last time, so force prompt.
cond['prompt_password'] = True
try:
clientparameters.resolve_password(cond, prompt_title = 'pg_python')
except EOFError:
raise SystemExit(1)
connector = pg_driver.fit(category = cat, **cond)
connection = connector()
if trace_file is not None:
connection.tracer = trace_file.write
connection.connect()
except pg_exc.ClientCannotConnectError as err:
for att in connection.failures:
exc = att.error
if isinstance(exc, pg_exc.AuthenticationSpecificationError):
sys.stderr.write(os.linesep + exc.message + (os.linesep*2))
# keep prompting the user
need_prompt = True
connection = None
break
else:
# no invalid password failures..
raise
pythonexec = pycmd.Execution(ca,
context = getattr(co, 'python_context', None),
loader = getattr(co, 'python_main', None),
)
builtin_overload = {
# New built-ins
'connector' : connector,
'db' : connection,
'do' : connection.do,
'prepare' : connection.prepare,
'sqlexec' : connection.execute,
'settings' : connection.settings,
'proc' : connection.proc,
'xact' : connection.xact,
}
if not isinstance(__builtins__, dict):
builtins_d = __builtins__.__dict__
else:
builtins_d = __builtins__
restore = {k : builtins_d.get(k) for k in builtin_overload}
builtins_d.update(builtin_overload)
try:
with connection:
rv = pythonexec(
context = pycmd.postmortem(os.environ.get('PYTHON_POSTMORTEM'))
)
exc = getattr(sys, 'last_type', None)
if rv and exc and not issubclass(exc, Exception):
# Don't try to close it if wasn't an Exception.
del connection.pq.socket
finally:
# restore __builtins__
builtins_d.update(restore)
for k, v in builtin_overload.items():
if v is None:
del builtins_d[x]
if trace_file is not None:
trace_file.close()
except:
pg_sys.libpath.remove(os.path.curdir)
raise
return rv
if __name__ == '__main__':
sys.exit(command(sys.argv))