Safemotion Lib
Loading...
Searching...
No Matches
optical_flow_motion.py
Go to the documentation of this file.
1import numpy as np
2import cv2
3import scipy.linalg
4
5class OpticalFlowMotion(object):
6 def __init__(self,):
7 self.termcriteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
8 self.feature_extract = cv2.FastFeatureDetector_create(30)
9 self.prev_image = None
10 self.prev_pts = None
11 self.frame_id = -1
12 self.w = 0.7
13 self.use_score = True
14 self.init_check = False
15
16 def collect_pts_in_bbox(self, bbox, pts):
17
18 case1 = pts[:, 0, 0] > bbox[0]
19 case2 = pts[:, 0, 1] > bbox[1]
20 case3 = pts[:, 0, 0] < bbox[2]
21 case4 = pts[:, 0, 1] < bbox[3]
22 valid = case1 & case2 & case3 & case4
23
24 return valid
25
26 def preprocessing(self, image_gray, mask):
27 self.cur_pts = self.feature_extract.detect(image_gray, mask)
28 if len(self.cur_pts) == 0:
29 self.cur_pts = np.zeros((0, 1, 2), dtype=float)
30 else:
31 self.cur_pts = cv2.KeyPoint_convert(self.cur_pts)
32 self.cur_pts = self.cur_pts.reshape(-1, 1, 2)
33
34 if self.init_check and len(self.prev_pts) != 0:
35 p2n_pts, status_p2n, err = cv2.calcOpticalFlowPyrLK(self.prev_image, image_gray, self.prev_pts, None, criteria=self.termcriteria)
36 n2p_pts, status_n2p, err = cv2.calcOpticalFlowPyrLK(image_gray, self.prev_image, p2n_pts, None, criteria=self.termcriteria)
37
38 valid1 = (status_p2n==1) & (status_n2p==1)
39 dpt = self.prev_pts-n2p_pts
40 dpt = dpt**2
41 dist = (dpt[:, :, 0] + dpt[:, :, 1])
42 valid2 = dist < 2.25 #1.5픽셀 이하
43 valid = valid1 & valid2
44 self.prev_pts = self.prev_pts[valid].reshape(-1,1,2)
45 self.p2n_pts = p2n_pts[valid].reshape(-1,1,2)
46
47 self.init_check = True
48 self.prev_image = image_gray
49
50 def postprocessing(self):
51 self.prev_pts = self.cur_pts
52
53 def initiate(self, measurement):
54 return measurement, True
55
56 def predict(self, bbox):
57 valid_check = False
58 if len(self.prev_pts) == 0:
59 return bbox, valid_check
60
61 box_pt_idx = self.collect_pts_in_bbox(bbox, self.prev_pts)
62 pPt = self.prev_pts[box_pt_idx]
63 nPt = self.p2n_pts[box_pt_idx]
64
65 if len(pPt) != 0:
66
67 px1 = pPt[:, 0, 0].min()
68 py1 = pPt[:, 0, 1].min()
69 px2 = pPt[:, 0, 0].max()
70 py2 = pPt[:, 0, 1].max()
71 pmx1 = bbox[0] - px1
72 pmy1 = bbox[1] - py1
73 pmx2 = bbox[2] - px2
74 pmy2 = bbox[3] - py2
75
76 nx1 = nPt[:, 0, 0].min()
77 ny1 = nPt[:, 0, 1].min()
78 nx2 = nPt[:, 0, 0].max()
79 ny2 = nPt[:, 0, 1].max()
80
81 valid_check = True
82 predict_box = np.array([nx1 + pmx1, ny1 + pmy1, nx2 + pmx2, ny2 + pmy2], dtype=float)
83 else:
84 predict_box = np.array([bbox[0], bbox[1], bbox[2], bbox[3]], dtype=float)
85
86 return predict_box, valid_check
87
88 def update(self, bbox, measurement):
89 if self.use_score:
90 score_w = (min(measurement[-1], 0.8) - 0.1) / 0.7
91 p_w = (1-score_w) + self.w
92 m_w = score_w + (1-self.w)
93 s_w = p_w+m_w
94 p_w = p_w/s_w
95 m_w = m_w/s_w
96 else:
97 p_w = self.w
98 m_w = (1-self.w)
99
100 return p_w*bbox + m_w*measurement[:4]