Safemotion Lib
Loading...
Searching...
No Matches
history_buffer.py
Go to the documentation of this file.
1#!/usr/bin/env python3
2# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
3
4import numpy as np
5from typing import List, Tuple
6
7
9 """
10 Track a series of scalar values and provide access to smoothed values over a
11 window or the global average of the series.
12 """
13
14 def __init__(self, max_length: int = 1000000):
15 """
16 Args:
17 max_length: maximal number of values that can be stored in the
18 buffer. When the capacity of the buffer is exhausted, old
19 values will be removed.
20 """
21 self._max_length: int = max_length
22 self._data: List[Tuple[float, float]] = [] # (value, iteration) pairs
23 self._count: int = 0
24 self._global_avg: float = 0
25
26 def update(self, value: float, iteration: float = None):
27 """
28 Add a new scalar value produced at certain iteration. If the length
29 of the buffer exceeds self._max_length, the oldest element will be
30 removed from the buffer.
31 """
32 if iteration is None:
33 iteration = self._count
34 if len(self._data) == self._max_length:
35 self._data.pop(0)
36 self._data.append((value, iteration))
37
38 self._count += 1
39 self._global_avg += (value - self._global_avg) / self._count
40
41 def latest(self):
42 """
43 Return the latest scalar value added to the buffer.
44 """
45 return self._data[-1][0]
46
47 def median(self, window_size: int):
48 """
49 Return the median of the latest `window_size` values in the buffer.
50 """
51 return np.median([x[0] for x in self._data[-window_size:]])
52
53 def avg(self, window_size: int):
54 """
55 Return the mean of the latest `window_size` values in the buffer.
56 """
57 return np.mean([x[0] for x in self._data[-window_size:]])
58
59 def global_avg(self):
60 """
61 Return the mean of all the elements in the buffer. Note that this
62 includes those getting removed due to limited buffer storage.
63 """
64 return self._global_avg
65
66 def values(self):
67 """
68 Returns:
69 list[(number, iteration)]: content of the current buffer.
70 """
71 return self._data
__init__(self, int max_length=1000000)
update(self, float value, float iteration=None)