Safemotion Lib
Loading...
Searching...
No Matches
mmpose_trt_runner.py
Go to the documentation of this file.
1from mmdeploy.apis.utils import build_task_processor
2from mmdeploy.utils import get_input_shape, load_config
3import torch
4import numpy as np
5
6from smutils.utils_image import crop_image
7
8class MMPoseTRTRunner(object):
9 def __init__(self, deploy_cfg, model_cfg, backend_model, device='cuda', batch_size=1):
10 """
11 mmpose의 Deploy(TensorRT) 모델을 구동시키기 위한 클래스
12 args:
13 deploy_cfg (str): deploy config 파일 경로
14 model_cfg (str): 모델이 정의된 config 파일 경로
15 backend_model (list[str]): TensorRT 모델 저장 경로
16 device (str): 모델을 구동할 디바이스
17 batch_size (int): 모델의 입력 배치 사이즈
18 """
19 self.deploy_cfg, self.model_cfg = load_config(deploy_cfg, model_cfg)
20 self.task_processor = build_task_processor(self.model_cfg, self.deploy_cfg, device)
21 self.model = self.task_processor.build_backend_model(backend_model)
22 self.input_shape = get_input_shape(deploy_cfg)
23 self.batch_size = batch_size
25
26 def build_test_pipeline(self, cfg):
27 """
28 모델의 입력 생성을 위한 pipeline을 빌드하는 기능
29 registry의 TRANSFORMS을 사용함, 미리 빌드를 해두어야함
30 mmlab의 다른 클래스(mmdetect)을 함께 사용하면 mmpose를 사용하기 전에 registry의 설정이 추가적으로 필요함
31 args:
32 cfg : 모델 config 파일을 로드한 객체
33 return:
34 모델의 입력 생성을 위한 pipeline
35 """
36 from mmpose.registry import TRANSFORMS
37 from mmcv.transforms import Compose
38 test_pipeline = [
39 TRANSFORMS.build(c) for c in cfg.test_dataloader.dataset.pipeline
40 ]
41 test_pipeline = Compose(test_pipeline)
42 return test_pipeline
43
44 def make_src(self, image, image_id, bboxes=None):
45 """
46 박스영역을 잘라내고 잘라낸 이미지에 대한 정보를 생성하는 기능
47 args:
48 image (np.array): RGB 이미지
49 image_id (int): 이미지 아이디
50 bboxes (np.array): 객체 영역의 박스
51 return:
52 crop_images (list[np.array]): 박스 영역을 자른 이미지
53 bboxes (np.array): crop_images의 박스
54 track_ids (list[float]): 추적 아이디
55 image_ids (list[int]): 이미지 아이디 리스트
56 """
57 if bboxes is None: #입력 박스가 없을 경우, 영상하나가 입력 데이터임
58 h, w = image.shape[:2]
59 return [image], np.array([[0, 0, w, h]]), np.array([0]), [image_id]
60
61 num_person, dim = bboxes.shape
62 box_index = 1 if dim == 6 else 0 #박스 부분의 시작 인덱스
63 track_ids = bboxes[:, 0] if dim == 6 else np.arange(num_person) #추적 아이디, dim이 6이 아니면 0부터 각기 다르게 설정
64 image_ids = [image_id]*num_person #이미지 아이디는 모두 동일하게 부여됨
65 crop_images = crop_image(image, bboxes[:, box_index:box_index+4]) #영상 자르기
66
67 return crop_images, bboxes[:, box_index:box_index+4], track_ids, image_ids
68
69 def divide_into_batches(self, data, batch_size):
70 """
71 데이터를 배치 단위로 분할하는 기능
72 args:
73 data (list): 데이터
74 batch_size (int)): 배치 크기
75 return (list): 배치 단위로 분할된 데이터
76 """
77
78 #배치 사이즈가 1이면 그대로 반환
79 if batch_size == 1:
80 return data
81
82 #배치 단위로 분할
83 batches = [data[i:i+batch_size] for i in range(0, len(data), batch_size)]
84 return batches
85
86 def run_detector(self, images, bboxes=None):
87 """
88 포즈 추정기를 동작시키는 기능
89 args:
90 image (np.array): RGB 이미지
91 bboxes (np.array): 객체 영역의 박스
92 """
93
94 #입력 타입 체크
95 is_batch = isinstance(images, (list, tuple))
96
97 #입력 통일 시키기
98 images = [images] if not is_batch else images
99 if not is_batch:
100 bboxes = [bboxes]
101 elif bboxes is None:
102 bboxes = [None]*len(images)
103
104 #데이터 생성, 영역 자르기
105 crop_images, bbox_list, track_ids, image_ids = [], [], [], []
106 for image_id, (image, bbox) in enumerate(zip(images, bboxes)):
107 crops, bbs, tids, imids = self.make_src(image, image_id, bbox)
108 crop_images.extend(crops)
109 bbox_list.append(bbs)
110 track_ids.append(tids)
111 image_ids.extend(imids)
112
113 bbox_list = np.vstack(bbox_list)
114 track_ids = np.hstack(track_ids)
115
116 #배치 단위로 분할
117 crop_images = self.divide_into_batches(crop_images, self.batch_size)
118 with torch.no_grad():
119 results = []
120
121 for image in crop_images:
122 #모델 입력 생성
123 processed_input = self.task_processor.create_input(
124 image,
125 self.input_shape,
126 test_pipeline=self.test_pipeline
127 )[0]
128
129 #모델 구동
130 inference_result = self.model.test_step(processed_input)
131
132 #모델 출력 모으기
133 for item in inference_result:
134 results.append(item)
135
136 # results = [item for image in crop_images for item in self.model.test_step(self.task_processor.create_input(image, self.input_shape, test_pipeline=self.test_pipeline)[0])]
137
138 #출력 데이터 생성, 추적아이디, 박스, 스켈레톤 데이터가 모두 포함됨
139 pose_results = []
140 pre_img_id = -1
141 for res, bbox, track_id, img_id in zip(results, bbox_list, track_ids, image_ids):
142 keypoints = res.pred_instances.keypoints + bbox[:2] #스켈레톤, 원본 이미지 기준으로 좌표 변환
143 keypoint_scores = np.expand_dims(res.pred_instances.keypoint_scores, axis=-1) #스켈레톤 스코어
144 keypoints = np.dstack((keypoints, keypoint_scores)) #스켈레톤 좌표, 스코어 합치기
145
146 # [1, 17, 3] -> [17, 3]
147 if keypoints.ndim == 3:
148 keypoints = keypoints[0]
149
150 #원본 이미지가 다를 경우
151 if pre_img_id != img_id:
152 pre_img_id = img_id
153 pose_results.append([])
154
155 #출력 데이터 생성
156 data = {'track_id': track_id, 'bbox': bbox, 'keypoints': keypoints}
157 pose_results[-1].append(data)
158
159 #입력이 원본 이미지 한장일 경우
160 if is_batch:
161 return pose_results
162 else:
163 if len(pose_results) == 0:
164 return pose_results
165 return pose_results[0]
166
make_src(self, image, image_id, bboxes=None)
divide_into_batches(self, data, batch_size)
run_detector(self, images, bboxes=None)
__init__(self, deploy_cfg, model_cfg, backend_model, device='cuda', batch_size=1)