@@ -172,6 +172,29 @@ def _create_and_install_waiters(fs, return_when):
172172
173173 return waiter
174174
175+
176+ def _yield_finished_futures (fs , waiter , ref_collect ):
177+ """
178+ Iterate on the list *fs*, yielding finished futures one by one in
179+ reverse order.
180+ Before yielding a future, *waiter* is removed from its waiters
181+ and the future is removed from each set in the collection of sets
182+ *ref_collect*.
183+
184+ The aim of this function is to avoid keeping stale references after
185+ the future is yielded and before the iterator resumes.
186+ """
187+ while fs :
188+ f = fs [- 1 ]
189+ for futures_set in ref_collect :
190+ futures_set .remove (f )
191+ with f ._condition :
192+ f ._waiters .remove (waiter )
193+ del f
194+ # Careful not to keep a reference to the popped value
195+ yield fs .pop ()
196+
197+
175198def as_completed (fs , timeout = None ):
176199 """An iterator over the given futures that yields each as it completes.
177200
@@ -194,16 +217,19 @@ def as_completed(fs, timeout=None):
194217 end_time = timeout + time .time ()
195218
196219 fs = set (fs )
220+ total_futures = len (fs )
197221 with _AcquireFutures (fs ):
198222 finished = set (
199223 f for f in fs
200224 if f ._state in [CANCELLED_AND_NOTIFIED , FINISHED ])
201225 pending = fs - finished
202226 waiter = _create_and_install_waiters (fs , _AS_COMPLETED )
203-
227+ finished = list ( finished )
204228 try :
205- for future in finished :
206- yield future
229+ for f in _yield_finished_futures (finished , waiter ,
230+ ref_collect = (fs ,)):
231+ f = [f ]
232+ yield f .pop ()
207233
208234 while pending :
209235 if timeout is None :
@@ -213,7 +239,7 @@ def as_completed(fs, timeout=None):
213239 if wait_timeout < 0 :
214240 raise TimeoutError (
215241 '%d (of %d) futures unfinished' % (
216- len (pending ), len ( fs ) ))
242+ len (pending ), total_futures ))
217243
218244 waiter .event .wait (wait_timeout )
219245
@@ -222,11 +248,15 @@ def as_completed(fs, timeout=None):
222248 waiter .finished_futures = []
223249 waiter .event .clear ()
224250
225- for future in finished :
226- yield future
227- pending .remove (future )
251+ # reverse to keep finishing order
252+ finished .reverse ()
253+ for f in _yield_finished_futures (finished , waiter ,
254+ ref_collect = (fs , pending )):
255+ f = [f ]
256+ yield f .pop ()
228257
229258 finally :
259+ # Remove waiter from unfinished futures
230260 for f in fs :
231261 with f ._condition :
232262 f ._waiters .remove (waiter )
@@ -600,11 +630,14 @@ def map(self, fn, *iterables, **kwargs):
600630 # before the first iterator value is required.
601631 def result_iterator ():
602632 try :
603- for future in fs :
633+ # reverse to keep finishing order
634+ fs .reverse ()
635+ while fs :
636+ # Careful not to keep a reference to the popped future
604637 if timeout is None :
605- yield future .result ()
638+ yield fs . pop () .result ()
606639 else :
607- yield future .result (end_time - time .time ())
640+ yield fs . pop () .result (end_time - time .time ())
608641 finally :
609642 for future in fs :
610643 future .cancel ()
0 commit comments