-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathfactory.py
More file actions
164 lines (132 loc) · 5.84 KB
/
factory.py
File metadata and controls
164 lines (132 loc) · 5.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""
Copyright 2006-2008 SpringSource (http://springsource.com), All Rights Reserved
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import logging
import re
import sys
import types
class ConnectionFactory(object):
def __init__(self, acceptable_types):
self.__db = None
self.acceptable_types = acceptable_types
"""This interface defines an object that is able to make database connections.
This allows database connections to be defined inside application contexts, and
fed to DAO and DatabaseTemplates."""
def connect(self):
raise NotImplementedError()
def getConnection(self):
if self.__db is None:
self.__db = self.connect()
return self.__db
def close(self):
"Need to offer API call to close the connection to the database."
if self.__db is not None:
self.__db.close()
self.__db = None
def commit(self):
if self.in_transaction():
self.getConnection().commit()
def rollback(self):
if self.in_transaction():
self.getConnection().rollback()
def in_transaction(self):
raise NotImplementedError()
def count_type(self):
raise NotImplementedError()
def convert_sql_binding(self, sql_query):
"""This is to help Java users migrate to Python. Java notation defines binding variables
points with '?', while Python uses '%s', and this method will convert from one format
to the other."""
return re.sub(pattern="\?", repl="%s", string=sql_query)
class MySQLConnectionFactory(ConnectionFactory):
def __init__(self, username = None, password = None, hostname = None, db = None):
ConnectionFactory.__init__(self, [types.TupleType])
self.username = username
self.password = password
self.hostname = hostname
self.db = db
def connect(self):
"""The import statement is delayed so the library is loaded ONLY if this factory is really used."""
import MySQLdb
return MySQLdb.connect(self.hostname, self.username, self.password, self.db)
def in_transaction(self):
return True
def count_type(self):
return types.LongType
class PgdbConnectionFactory(ConnectionFactory):
def __init__(self, user = None, password = None, host = None, database = None):
ConnectionFactory.__init__(self, [types.TupleType])
self.user = user
self.password = password
self.host = host
self.database = database
def connect(self):
"""The import statement is delayed so the library is loaded ONLY if this factory is really used."""
import pgdb
return pgdb.connect(user=self.user, password=self.password, database=self.database, host=self.host)
def in_transaction(self):
return True
def count_type(self):
return types.LongType
class Sqlite3ConnectionFactory(ConnectionFactory):
def __init__(self, db = None, check_same_thread=True):
ConnectionFactory.__init__(self, [types.TupleType])
self.db = db
self.check_same_thread = check_same_thread
self.using_sqlite3 = True
def connect(self):
"""The import statement is delayed so the library is loaded ONLY if this factory is really used."""
try:
import sqlite3
return sqlite3.connect(self.db, check_same_thread=self.check_same_thread)
except:
import sqlite
self.using_sqlite3 = False
return sqlite.connect(self.db, check_same_thread=self.check_same_thread)
def in_transaction(self):
return True
def count_type(self):
return types.IntType
def convert_sql_binding(self, sql_query):
if self.using_sqlite3:
"""sqlite3 uses the ? notation, like Java's JDBC."""
return re.sub(pattern="%s", repl="?", string=sql_query)
else:
"""Older versions of sqlite use the %s notation"""
return re.sub(pattern="\?", repl="%s", string=sql_query)
class cxoraConnectionFactory(ConnectionFactory):
def __init__(self, username = None, password = None, hostname = None, db = None):
ConnectionFactory.__init__(self, [types.DictType])
self.username = username
self.password = password
self.hostname = hostname
self.db = db
def connect(self):
"""The import statement is delayed so the library is loaded ONLY if this factory is really used."""
import cx_Oracle
return cx_Oracle.connect(self.username, self.password, self.db)
class SQLServerConnectionFactory(ConnectionFactory):
def __init__(self, **odbc_info):
ConnectionFactory.__init__(self, [types.TupleType])
self.odbc_info = odbc_info
def connect(self):
"""The import statement is delayed so the library is loaded ONLY if this factory is really used."""
import pyodbc
odbc_info = ";".join(["%s=%s" % (key, value) for key, value in self.odbc_info.items()])
return pyodbc.connect(odbc_info)
def in_transaction(self):
return True
def count_type(self):
return types.IntType
def convert_sql_binding(self, sql_query):
"""SQL Server expects parameters to be passed as question marks."""
return re.sub(pattern="%s", repl="?", string=sql_query)