itertools/
groupbylazy.rs

1use alloc::vec::{self, Vec};
2use std::cell::{Cell, RefCell};
3
4/// A trait to unify `FnMut` for `GroupBy` with the chunk key in `IntoChunks`
5trait KeyFunction<A> {
6    type Key;
7    fn call_mut(&mut self, arg: A) -> Self::Key;
8}
9
10impl<A, K, F> KeyFunction<A> for F
11where
12    F: FnMut(A) -> K + ?Sized,
13{
14    type Key = K;
15    #[inline]
16    fn call_mut(&mut self, arg: A) -> Self::Key {
17        (*self)(arg)
18    }
19}
20
21/// `ChunkIndex` acts like the grouping key function for `IntoChunks`
22#[derive(Debug, Clone)]
23struct ChunkIndex {
24    size: usize,
25    index: usize,
26    key: usize,
27}
28
29impl ChunkIndex {
30    #[inline(always)]
31    fn new(size: usize) -> Self {
32        Self {
33            size,
34            index: 0,
35            key: 0,
36        }
37    }
38}
39
40impl<A> KeyFunction<A> for ChunkIndex {
41    type Key = usize;
42    #[inline(always)]
43    fn call_mut(&mut self, _arg: A) -> Self::Key {
44        if self.index == self.size {
45            self.key += 1;
46            self.index = 0;
47        }
48        self.index += 1;
49        self.key
50    }
51}
52
53#[derive(Clone)]
54struct GroupInner<K, I, F>
55where
56    I: Iterator,
57{
58    key: F,
59    iter: I,
60    current_key: Option<K>,
61    current_elt: Option<I::Item>,
62    /// flag set if iterator is exhausted
63    done: bool,
64    /// Index of group we are currently buffering or visiting
65    top_group: usize,
66    /// Least index for which we still have elements buffered
67    oldest_buffered_group: usize,
68    /// Group index for `buffer[0]` -- the slots
69    /// bottom_group..oldest_buffered_group are unused and will be erased when
70    /// that range is large enough.
71    bottom_group: usize,
72    /// Buffered groups, from `bottom_group` (index 0) to `top_group`.
73    buffer: Vec<vec::IntoIter<I::Item>>,
74    /// index of last group iter that was dropped, usize::MAX == none
75    dropped_group: usize,
76}
77
78impl<K, I, F> GroupInner<K, I, F>
79where
80    I: Iterator,
81    F: for<'a> KeyFunction<&'a I::Item, Key = K>,
82    K: PartialEq,
83{
84    /// `client`: Index of group that requests next element
85    #[inline(always)]
86    fn step(&mut self, client: usize) -> Option<I::Item> {
87        /*
88        println!("client={}, bottom_group={}, oldest_buffered_group={}, top_group={}, buffers=[{}]",
89                 client, self.bottom_group, self.oldest_buffered_group,
90                 self.top_group,
91                 self.buffer.iter().map(|elt| elt.len()).format(", "));
92        */
93        if client < self.oldest_buffered_group {
94            None
95        } else if client < self.top_group
96            || (client == self.top_group && self.buffer.len() > self.top_group - self.bottom_group)
97        {
98            self.lookup_buffer(client)
99        } else if self.done {
100            None
101        } else if self.top_group == client {
102            self.step_current()
103        } else {
104            self.step_buffering(client)
105        }
106    }
107
108    #[inline(never)]
109    fn lookup_buffer(&mut self, client: usize) -> Option<I::Item> {
110        // if `bufidx` doesn't exist in self.buffer, it might be empty
111        let bufidx = client - self.bottom_group;
112        if client < self.oldest_buffered_group {
113            return None;
114        }
115        let elt = self.buffer.get_mut(bufidx).and_then(|queue| queue.next());
116        if elt.is_none() && client == self.oldest_buffered_group {
117            // FIXME: VecDeque is unfortunately not zero allocation when empty,
118            // so we do this job manually.
119            // `bottom_group..oldest_buffered_group` is unused, and if it's large enough, erase it.
120            self.oldest_buffered_group += 1;
121            // skip forward further empty queues too
122            while self
123                .buffer
124                .get(self.oldest_buffered_group - self.bottom_group)
125                .map_or(false, |buf| buf.len() == 0)
126            {
127                self.oldest_buffered_group += 1;
128            }
129
130            let nclear = self.oldest_buffered_group - self.bottom_group;
131            if nclear > 0 && nclear >= self.buffer.len() / 2 {
132                let mut i = 0;
133                self.buffer.retain(|buf| {
134                    i += 1;
135                    debug_assert!(buf.len() == 0 || i > nclear);
136                    i > nclear
137                });
138                self.bottom_group = self.oldest_buffered_group;
139            }
140        }
141        elt
142    }
143
144    /// Take the next element from the iterator, and set the done
145    /// flag if exhausted. Must not be called after done.
146    #[inline(always)]
147    fn next_element(&mut self) -> Option<I::Item> {
148        debug_assert!(!self.done);
149        match self.iter.next() {
150            None => {
151                self.done = true;
152                None
153            }
154            otherwise => otherwise,
155        }
156    }
157
158    #[inline(never)]
159    fn step_buffering(&mut self, client: usize) -> Option<I::Item> {
160        // requested a later group -- walk through the current group up to
161        // the requested group index, and buffer the elements (unless
162        // the group is marked as dropped).
163        // Because the `Groups` iterator is always the first to request
164        // each group index, client is the next index efter top_group.
165        debug_assert!(self.top_group + 1 == client);
166        let mut group = Vec::new();
167
168        if let Some(elt) = self.current_elt.take() {
169            if self.top_group != self.dropped_group {
170                group.push(elt);
171            }
172        }
173        let mut first_elt = None; // first element of the next group
174
175        while let Some(elt) = self.next_element() {
176            let key = self.key.call_mut(&elt);
177            match self.current_key.take() {
178                None => {}
179                Some(old_key) => {
180                    if old_key != key {
181                        self.current_key = Some(key);
182                        first_elt = Some(elt);
183                        break;
184                    }
185                }
186            }
187            self.current_key = Some(key);
188            if self.top_group != self.dropped_group {
189                group.push(elt);
190            }
191        }
192
193        if self.top_group != self.dropped_group {
194            self.push_next_group(group);
195        }
196        if first_elt.is_some() {
197            self.top_group += 1;
198            debug_assert!(self.top_group == client);
199        }
200        first_elt
201    }
202
203    fn push_next_group(&mut self, group: Vec<I::Item>) {
204        // When we add a new buffered group, fill up slots between oldest_buffered_group and top_group
205        while self.top_group - self.bottom_group > self.buffer.len() {
206            if self.buffer.is_empty() {
207                self.bottom_group += 1;
208                self.oldest_buffered_group += 1;
209            } else {
210                self.buffer.push(Vec::new().into_iter());
211            }
212        }
213        self.buffer.push(group.into_iter());
214        debug_assert!(self.top_group + 1 - self.bottom_group == self.buffer.len());
215    }
216
217    /// This is the immediate case, where we use no buffering
218    #[inline]
219    fn step_current(&mut self) -> Option<I::Item> {
220        debug_assert!(!self.done);
221        if let elt @ Some(..) = self.current_elt.take() {
222            return elt;
223        }
224        match self.next_element() {
225            None => None,
226            Some(elt) => {
227                let key = self.key.call_mut(&elt);
228                match self.current_key.take() {
229                    None => {}
230                    Some(old_key) => {
231                        if old_key != key {
232                            self.current_key = Some(key);
233                            self.current_elt = Some(elt);
234                            self.top_group += 1;
235                            return None;
236                        }
237                    }
238                }
239                self.current_key = Some(key);
240                Some(elt)
241            }
242        }
243    }
244
245    /// Request the just started groups' key.
246    ///
247    /// `client`: Index of group
248    ///
249    /// **Panics** if no group key is available.
250    fn group_key(&mut self, client: usize) -> K {
251        // This can only be called after we have just returned the first
252        // element of a group.
253        // Perform this by simply buffering one more element, grabbing the
254        // next key.
255        debug_assert!(!self.done);
256        debug_assert!(client == self.top_group);
257        debug_assert!(self.current_key.is_some());
258        debug_assert!(self.current_elt.is_none());
259        let old_key = self.current_key.take().unwrap();
260        if let Some(elt) = self.next_element() {
261            let key = self.key.call_mut(&elt);
262            if old_key != key {
263                self.top_group += 1;
264            }
265            self.current_key = Some(key);
266            self.current_elt = Some(elt);
267        }
268        old_key
269    }
270}
271
272impl<K, I, F> GroupInner<K, I, F>
273where
274    I: Iterator,
275{
276    /// Called when a group is dropped
277    fn drop_group(&mut self, client: usize) {
278        // It's only useful to track the maximal index
279        if self.dropped_group == !0 || client > self.dropped_group {
280            self.dropped_group = client;
281        }
282    }
283}
284
285/// `GroupBy` is the storage for the lazy grouping operation.
286///
287/// If the groups are consumed in their original order, or if each
288/// group is dropped without keeping it around, then `GroupBy` uses
289/// no allocations. It needs allocations only if several group iterators
290/// are alive at the same time.
291///
292/// This type implements [`IntoIterator`] (it is **not** an iterator
293/// itself), because the group iterators need to borrow from this
294/// value. It should be stored in a local variable or temporary and
295/// iterated.
296///
297/// See [`.group_by()`](crate::Itertools::group_by) for more information.
298#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
299pub struct GroupBy<K, I, F>
300where
301    I: Iterator,
302{
303    inner: RefCell<GroupInner<K, I, F>>,
304    // the group iterator's current index. Keep this in the main value
305    // so that simultaneous iterators all use the same state.
306    index: Cell<usize>,
307}
308
309/// Create a new
310pub fn new<K, J, F>(iter: J, f: F) -> GroupBy<K, J::IntoIter, F>
311where
312    J: IntoIterator,
313    F: FnMut(&J::Item) -> K,
314{
315    GroupBy {
316        inner: RefCell::new(GroupInner {
317            key: f,
318            iter: iter.into_iter(),
319            current_key: None,
320            current_elt: None,
321            done: false,
322            top_group: 0,
323            oldest_buffered_group: 0,
324            bottom_group: 0,
325            buffer: Vec::new(),
326            dropped_group: !0,
327        }),
328        index: Cell::new(0),
329    }
330}
331
332impl<K, I, F> GroupBy<K, I, F>
333where
334    I: Iterator,
335{
336    /// `client`: Index of group that requests next element
337    fn step(&self, client: usize) -> Option<I::Item>
338    where
339        F: FnMut(&I::Item) -> K,
340        K: PartialEq,
341    {
342        self.inner.borrow_mut().step(client)
343    }
344
345    /// `client`: Index of group
346    fn drop_group(&self, client: usize) {
347        self.inner.borrow_mut().drop_group(client);
348    }
349}
350
351impl<'a, K, I, F> IntoIterator for &'a GroupBy<K, I, F>
352where
353    I: Iterator,
354    I::Item: 'a,
355    F: FnMut(&I::Item) -> K,
356    K: PartialEq,
357{
358    type Item = (K, Group<'a, K, I, F>);
359    type IntoIter = Groups<'a, K, I, F>;
360
361    fn into_iter(self) -> Self::IntoIter {
362        Groups { parent: self }
363    }
364}
365
366/// An iterator that yields the Group iterators.
367///
368/// Iterator element type is `(K, Group)`:
369/// the group's key `K` and the group's iterator.
370///
371/// See [`.group_by()`](crate::Itertools::group_by) for more information.
372#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
373pub struct Groups<'a, K, I, F>
374where
375    I: Iterator + 'a,
376    I::Item: 'a,
377    K: 'a,
378    F: 'a,
379{
380    parent: &'a GroupBy<K, I, F>,
381}
382
383impl<'a, K, I, F> Iterator for Groups<'a, K, I, F>
384where
385    I: Iterator,
386    I::Item: 'a,
387    F: FnMut(&I::Item) -> K,
388    K: PartialEq,
389{
390    type Item = (K, Group<'a, K, I, F>);
391
392    #[inline]
393    fn next(&mut self) -> Option<Self::Item> {
394        let index = self.parent.index.get();
395        self.parent.index.set(index + 1);
396        let inner = &mut *self.parent.inner.borrow_mut();
397        inner.step(index).map(|elt| {
398            let key = inner.group_key(index);
399            (
400                key,
401                Group {
402                    parent: self.parent,
403                    index,
404                    first: Some(elt),
405                },
406            )
407        })
408    }
409}
410
411/// An iterator for the elements in a single group.
412///
413/// Iterator element type is `I::Item`.
414pub struct Group<'a, K, I, F>
415where
416    I: Iterator + 'a,
417    I::Item: 'a,
418    K: 'a,
419    F: 'a,
420{
421    parent: &'a GroupBy<K, I, F>,
422    index: usize,
423    first: Option<I::Item>,
424}
425
426impl<'a, K, I, F> Drop for Group<'a, K, I, F>
427where
428    I: Iterator,
429    I::Item: 'a,
430{
431    fn drop(&mut self) {
432        self.parent.drop_group(self.index);
433    }
434}
435
436impl<'a, K, I, F> Iterator for Group<'a, K, I, F>
437where
438    I: Iterator,
439    I::Item: 'a,
440    F: FnMut(&I::Item) -> K,
441    K: PartialEq,
442{
443    type Item = I::Item;
444    #[inline]
445    fn next(&mut self) -> Option<Self::Item> {
446        if let elt @ Some(..) = self.first.take() {
447            return elt;
448        }
449        self.parent.step(self.index)
450    }
451}
452
453///// IntoChunks /////
454
455/// Create a new
456pub fn new_chunks<J>(iter: J, size: usize) -> IntoChunks<J::IntoIter>
457where
458    J: IntoIterator,
459{
460    IntoChunks {
461        inner: RefCell::new(GroupInner {
462            key: ChunkIndex::new(size),
463            iter: iter.into_iter(),
464            current_key: None,
465            current_elt: None,
466            done: false,
467            top_group: 0,
468            oldest_buffered_group: 0,
469            bottom_group: 0,
470            buffer: Vec::new(),
471            dropped_group: !0,
472        }),
473        index: Cell::new(0),
474    }
475}
476
477/// `ChunkLazy` is the storage for a lazy chunking operation.
478///
479/// `IntoChunks` behaves just like `GroupBy`: it is iterable, and
480/// it only buffers if several chunk iterators are alive at the same time.
481///
482/// This type implements [`IntoIterator`] (it is **not** an iterator
483/// itself), because the chunk iterators need to borrow from this
484/// value. It should be stored in a local variable or temporary and
485/// iterated.
486///
487/// Iterator element type is `Chunk`, each chunk's iterator.
488///
489/// See [`.chunks()`](crate::Itertools::chunks) for more information.
490#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
491pub struct IntoChunks<I>
492where
493    I: Iterator,
494{
495    inner: RefCell<GroupInner<usize, I, ChunkIndex>>,
496    // the chunk iterator's current index. Keep this in the main value
497    // so that simultaneous iterators all use the same state.
498    index: Cell<usize>,
499}
500
501impl<I> Clone for IntoChunks<I>
502where
503    I: Clone + Iterator,
504    I::Item: Clone,
505{
506    clone_fields!(inner, index);
507}
508
509impl<I> IntoChunks<I>
510where
511    I: Iterator,
512{
513    /// `client`: Index of chunk that requests next element
514    fn step(&self, client: usize) -> Option<I::Item> {
515        self.inner.borrow_mut().step(client)
516    }
517
518    /// `client`: Index of chunk
519    fn drop_group(&self, client: usize) {
520        self.inner.borrow_mut().drop_group(client);
521    }
522}
523
524impl<'a, I> IntoIterator for &'a IntoChunks<I>
525where
526    I: Iterator,
527    I::Item: 'a,
528{
529    type Item = Chunk<'a, I>;
530    type IntoIter = Chunks<'a, I>;
531
532    fn into_iter(self) -> Self::IntoIter {
533        Chunks { parent: self }
534    }
535}
536
537/// An iterator that yields the Chunk iterators.
538///
539/// Iterator element type is `Chunk`.
540///
541/// See [`.chunks()`](crate::Itertools::chunks) for more information.
542#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
543#[derive(Clone)]
544pub struct Chunks<'a, I>
545where
546    I: Iterator + 'a,
547    I::Item: 'a,
548{
549    parent: &'a IntoChunks<I>,
550}
551
552impl<'a, I> Iterator for Chunks<'a, I>
553where
554    I: Iterator,
555    I::Item: 'a,
556{
557    type Item = Chunk<'a, I>;
558
559    #[inline]
560    fn next(&mut self) -> Option<Self::Item> {
561        let index = self.parent.index.get();
562        self.parent.index.set(index + 1);
563        let inner = &mut *self.parent.inner.borrow_mut();
564        inner.step(index).map(|elt| Chunk {
565            parent: self.parent,
566            index,
567            first: Some(elt),
568        })
569    }
570}
571
572/// An iterator for the elements in a single chunk.
573///
574/// Iterator element type is `I::Item`.
575pub struct Chunk<'a, I>
576where
577    I: Iterator + 'a,
578    I::Item: 'a,
579{
580    parent: &'a IntoChunks<I>,
581    index: usize,
582    first: Option<I::Item>,
583}
584
585impl<'a, I> Drop for Chunk<'a, I>
586where
587    I: Iterator,
588    I::Item: 'a,
589{
590    fn drop(&mut self) {
591        self.parent.drop_group(self.index);
592    }
593}
594
595impl<'a, I> Iterator for Chunk<'a, I>
596where
597    I: Iterator,
598    I::Item: 'a,
599{
600    type Item = I::Item;
601    #[inline]
602    fn next(&mut self) -> Option<Self::Item> {
603        if let elt @ Some(..) = self.first.take() {
604            return elt;
605        }
606        self.parent.step(self.index)
607    }
608}