Skip to content

Commit 61f5abb

Browse files
committed
Backport fixes to as_completed and map iterators (bpo-27144)
Python issues: + python/cpython#1560 + python/cpython#3270 + python/cpython#3830
1 parent e8543e6 commit 61f5abb

File tree

1 file changed

+43
-10
lines changed

1 file changed

+43
-10
lines changed

concurrent/futures/_base.py

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,29 @@ def _create_and_install_waiters(fs, return_when):
172172

173173
return waiter
174174

175+
176+
def _yield_finished_futures(fs, waiter, ref_collect):
177+
"""
178+
Iterate on the list *fs*, yielding finished futures one by one in
179+
reverse order.
180+
Before yielding a future, *waiter* is removed from its waiters
181+
and the future is removed from each set in the collection of sets
182+
*ref_collect*.
183+
184+
The aim of this function is to avoid keeping stale references after
185+
the future is yielded and before the iterator resumes.
186+
"""
187+
while fs:
188+
f = fs[-1]
189+
for futures_set in ref_collect:
190+
futures_set.remove(f)
191+
with f._condition:
192+
f._waiters.remove(waiter)
193+
del f
194+
# Careful not to keep a reference to the popped value
195+
yield fs.pop()
196+
197+
175198
def as_completed(fs, timeout=None):
176199
"""An iterator over the given futures that yields each as it completes.
177200
@@ -194,16 +217,19 @@ def as_completed(fs, timeout=None):
194217
end_time = timeout + time.time()
195218

196219
fs = set(fs)
220+
total_futures = len(fs)
197221
with _AcquireFutures(fs):
198222
finished = set(
199223
f for f in fs
200224
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
201225
pending = fs - finished
202226
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
203-
227+
finished = list(finished)
204228
try:
205-
for future in finished:
206-
yield future
229+
for f in _yield_finished_futures(finished, waiter,
230+
ref_collect=(fs,)):
231+
f = [f]
232+
yield f.pop()
207233

208234
while pending:
209235
if timeout is None:
@@ -213,7 +239,7 @@ def as_completed(fs, timeout=None):
213239
if wait_timeout < 0:
214240
raise TimeoutError(
215241
'%d (of %d) futures unfinished' % (
216-
len(pending), len(fs)))
242+
len(pending), total_futures))
217243

218244
waiter.event.wait(wait_timeout)
219245

@@ -222,11 +248,15 @@ def as_completed(fs, timeout=None):
222248
waiter.finished_futures = []
223249
waiter.event.clear()
224250

225-
for future in finished:
226-
yield future
227-
pending.remove(future)
251+
# reverse to keep finishing order
252+
finished.reverse()
253+
for f in _yield_finished_futures(finished, waiter,
254+
ref_collect=(fs, pending)):
255+
f = [f]
256+
yield f.pop()
228257

229258
finally:
259+
# Remove waiter from unfinished futures
230260
for f in fs:
231261
with f._condition:
232262
f._waiters.remove(waiter)
@@ -600,11 +630,14 @@ def map(self, fn, *iterables, **kwargs):
600630
# before the first iterator value is required.
601631
def result_iterator():
602632
try:
603-
for future in fs:
633+
# reverse to keep finishing order
634+
fs.reverse()
635+
while fs:
636+
# Careful not to keep a reference to the popped future
604637
if timeout is None:
605-
yield future.result()
638+
yield fs.pop().result()
606639
else:
607-
yield future.result(end_time - time.time())
640+
yield fs.pop().result(end_time - time.time())
608641
finally:
609642
for future in fs:
610643
future.cancel()

0 commit comments

Comments
 (0)