-
-
Notifications
You must be signed in to change notification settings - Fork 252
Expand file tree
/
Copy pathhistory.py
More file actions
234 lines (203 loc) · 8.12 KB
/
history.py
File metadata and controls
234 lines (203 loc) · 8.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# encoding: utf-8
# The MIT License
#
# Copyright (c) 2009 the bpython authors.
# Copyirhgt (c) 2012,2015 Sebastian Ramacher
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from __future__ import unicode_literals
import io
import os
import stat
from itertools import islice
from six.moves import range
from bpython.translations import _
from bpython.filelock import FileLock
class History(object):
"""Stores readline-style history and current place in it"""
def __init__(self, entries=None, duplicates=True, hist_size=100):
if entries is None:
self.entries = ['']
else:
self.entries = list(entries)
# how many lines back in history is currently selected where 0 is the
# saved typed line, 1 the prev entered line
self.index = 0
# what was on the prompt before using history
self.saved_line = ''
self.duplicates = duplicates
self.hist_size = hist_size
def append(self, line):
self.append_to(self.entries, line)
def append_to(self, entries, line):
line = line.rstrip('\n')
if line:
if not self.duplicates:
# remove duplicates
try:
while True:
entries.remove(line)
except ValueError:
pass
entries.append(line)
def first(self):
"""Move back to the beginning of the history."""
if not self.is_at_end:
self.index = len(self.entries)
return self.entries[-self.index]
def back(self, start=True, search=False, target=None,
include_current=False):
"""Move one step back in the history."""
if target is None:
target = self.saved_line
if not self.is_at_end:
if search:
self.index += self.find_partial_match_backward(target,
include_current)
elif start:
self.index += self.find_match_backward(target, include_current)
else:
self.index += 1
return self.entry
@property
def entry(self):
"""The current entry, which may be the saved line"""
return self.entries[-self.index] if self.index else self.saved_line
@property
def entries_by_index(self):
return list(reversed(self.entries + [self.saved_line]))
def find_match_backward(self, search_term, include_current=False):
add = 0 if include_current else 1
start = self.index + add
for idx, val in enumerate(islice(self.entries_by_index, start, None)):
if val.startswith(search_term):
return idx + add
return 0
def find_partial_match_backward(self, search_term, include_current=False):
add = 0 if include_current else 1
start = self.index + add
for idx, val in enumerate(islice(self.entries_by_index, start, None)):
if search_term in val:
return idx + add
return 0
def forward(self, start=True, search=False, target=None,
include_current=False):
"""Move one step forward in the history."""
if target is None:
target = self.saved_line
if self.index > 1:
if search:
self.index -= self.find_partial_match_forward(target,
include_current)
elif start:
self.index -= self.find_match_forward(target, include_current)
else:
self.index -= 1
return self.entry
else:
self.index = 0
return self.saved_line
def find_match_forward(self, search_term, include_current=False):
add = 0 if include_current else 1
end = max(0, self.index - (1 - add))
for idx in range(end):
val = self.entries_by_index[end - 1 - idx]
if val.startswith(search_term):
return idx + (0 if include_current else 1)
return self.index
def find_partial_match_forward(self, search_term, include_current=False):
add = 0 if include_current else 1
end = max(0, self.index - (1 - add))
for idx in range(end):
val = self.entries_by_index[end - 1 - idx]
if search_term in val:
return idx + add
return self.index
def last(self):
"""Move forward to the end of the history."""
if not self.is_at_start:
self.index = 0
return self.entries[0]
@property
def is_at_end(self):
return self.index >= len(self.entries) or self.index == -1
@property
def is_at_start(self):
return self.index == 0
def enter(self, line):
if self.index == 0:
self.saved_line = line
@classmethod
def from_filename(cls, filename):
history = cls()
history.load(filename)
return history
def reset(self):
self.index = 0
self.saved_line = ''
def load(self, filename, encoding):
with io.open(filename, 'r', encoding=encoding,
errors='ignore') as hfile:
with FileLock(hfile):
self.entries = self.load_from(hfile)
def load_from(self, fd):
entries = []
for line in fd:
self.append_to(entries, line)
return entries
def save(self, filename, encoding, lines=0):
fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.TRUNC,
stat.S_IRUSR | stat.S_IWUSR)
with io.open(fd, 'w', encoding=encoding,
errors='ignore') as hfile:
with FileLock(hfile):
self.save_to(hfile, self.entries, lines)
def save_to(self, fd, entries=None, lines=0):
if entries is None:
entries = self.entries
for line in entries[-lines:]:
fd.write(line)
fd.write('\n')
def append_reload_and_write(self, s, filename, encoding):
if not self.hist_size:
return self.append(s)
try:
fd = os.open(filename, os.O_APPEND | os.O_RDWR | os.O_CREAT,
stat.S_IRUSR | stat.S_IWUSR)
with io.open(fd, 'a+', encoding=encoding,
errors='ignore') as hfile:
with FileLock(hfile):
# read entries
hfile.seek(0, os.SEEK_SET)
entries = self.load_from(hfile)
self.append_to(entries, s)
# write new entries
hfile.seek(0, os.SEEK_SET)
hfile.truncate()
self.save_to(hfile, entries, self.hist_size)
self.entries = entries
except EnvironmentError as err:
raise RuntimeError(
_('Error occurred while writing to file %s (%s)')
% (filename, err.strerror))
else:
if len(self.entries) == 0:
# Make sure that entries contains at least one element. If the
# file and s are empty, this can occur.
self.entries = ['']