1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
|
# ocr源码
from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os
import ujson
from media.media import *
from media.sensor import *
from time import *
import nncase_runtime as nn
import ulab.numpy as np
import time
import image
import aicube
import random
import gc
import sys
from machine import UART,FPIOA
# 自定义OCR检测类
class OCRDetectionApp(AIBase):
def __init__(self,
kmodel_path,
model_input_size,
mask_threshold=0.5,
box_threshold=0.2,
rgb888p_size=[224,224],
display_size=[1920,1080],
debug_mode=0):
super().__init__(kmodel_path,model_input_size,rgb888p_size,debug_mode)
self.kmodel_path=kmodel_path
# 模型输入分辨率
self.model_input_size=model_input_size
# 分类阈值
self.mask_threshold=mask_threshold
self.box_threshold=box_threshold
# sensor给到AI的图像分辨率
self.rgb888p_size=[ALIGN_UP(rgb888p_size[0],16),rgb888p_size[1]]
# 显示分辨率
self.display_size=[ALIGN_UP(display_size[0],16),display_size[1]]
self.debug_mode=debug_mode
# Ai2d实例,用于实现模型预处理
self.ai2d=Ai2d(debug_mode)
# 设置Ai2d的输入输出格式和类型
self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT,nn.ai2d_format.NCHW_FMT,np.uint8, np.uint8)
# 配置预处理操作,这里使用了pad和resize,Ai2d支持crop/shift/pad/resize/affine,具体代码请打开/sdcard/libs/AI2D.py查看
def config_preprocess(self,input_image_size=None):
with ScopedTiming("set preprocess config",self.debug_mode > 0):
# 初始化ai2d预处理配置,默认为sensor给到AI的尺寸,您可以通过设置input_image_size自行修改输入尺寸
ai2d_input_size=input_image_size if input_image_size else self.rgb888p_size
top,bottom,left,right=self.get_padding_param()
self.ai2d.pad([0,0,0,0,top,bottom,left,right], 0, [0,0,0])
self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
self.ai2d.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,self.model_input_size[1],self.model_input_size[0]])
# 自定义当前任务的后处理
def postprocess(self,results):
with ScopedTiming("postprocess",self.debug_mode > 0):
# chw2hwc
hwc_array=self.chw2hwc(self.cur_img)
# 这里使用了aicube封装的接口ocr_post_process做后处理,返回的det_boxes结构为[[crop_array_nhwc,[p1_x,p1_y,p2_x,p2_y,p3_x,p3_y,p4_x,p4_y]],...]
det_boxes = aicube.ocr_post_process(results[0][:,:,:,0].reshape(-1), hwc_array.reshape(-1),self.model_input_size,self.rgb888p_size, self.mask_threshold, self.box_threshold)
return det_boxes
# 计算padding参数,在config_preprocess中使用
def get_padding_param(self):
# 右padding或下padding
dst_w = self.model_input_size[0]
dst_h = self.model_input_size[1]
input_width = self.rgb888p_size[0]
input_high = self.rgb888p_size[1]
ratio_w = dst_w / input_width
ratio_h = dst_h / input_high
if ratio_w < ratio_h:
ratio = ratio_w
else:
ratio = ratio_h
new_w = (int)(ratio * input_width)
new_h = (int)(ratio * input_high)
dw = (dst_w - new_w) / 2
dh = (dst_h - new_h) / 2
top = (int)(round(0))
bottom = (int)(round(dh * 2 + 0.1))
left = (int)(round(0))
right = (int)(round(dw * 2 - 0.1))
return top, bottom, left, right
# chw2hwc
def chw2hwc(self,features):
ori_shape = (features.shape[0], features.shape[1], features.shape[2])
c_hw_ = features.reshape((ori_shape[0], ori_shape[1] * ori_shape[2]))
hw_c_ = c_hw_.transpose()
new_array = hw_c_.copy()
hwc_array = new_array.reshape((ori_shape[1], ori_shape[2], ori_shape[0]))
del c_hw_
del hw_c_
del new_array
return hwc_array
# 自定义OCR识别任务类
class OCRRecognitionApp(AIBase):
def __init__(self,kmodel_path,model_input_size,dict_path,rgb888p_size=[1920,1080],display_size=[1920,1080],debug_mode=0):
super().__init__(kmodel_path,model_input_size,rgb888p_size,debug_mode)
# kmodel路径
self.kmodel_path=kmodel_path
# 识别模型输入分辨率
self.model_input_size=model_input_size
self.dict_path=dict_path
# sensor给到AI的图像分辨率,宽16字节对齐
self.rgb888p_size=[ALIGN_UP(rgb888p_size[0],16),rgb888p_size[1]]
# 视频输出VO分辨率,宽16字节对齐
self.display_size=[ALIGN_UP(display_size[0],16),display_size[1]]
# debug模式
self.debug_mode=debug_mode
self.dict_word=None
# 读取OCR的字典
self.read_dict()
self.ai2d=Ai2d(debug_mode)
self.ai2d.set_ai2d_dtype(nn.ai2d_format.RGB_packed,nn.ai2d_format.NCHW_FMT,np.uint8, np.uint8)
# 配置预处理操作,这里使用了pad和resize,Ai2d支持crop/shift/pad/resize/affine,具体代码请打开/sdcard/libs/AI2D.py查看
def config_preprocess(self,input_image_size=None,input_np=None):
with ScopedTiming("set preprocess config",self.debug_mode > 0):
ai2d_input_size=input_image_size if input_image_size else self.rgb888p_size
top,bottom,left,right=self.get_padding_param(ai2d_input_size,self.model_input_size)
self.ai2d.pad([0,0,0,0,top,bottom,left,right], 0, [0,0,0])
self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
# 如果传入input_np,输入shape为input_np的shape,如果不传入,输入shape为[1,3,ai2d_input_size[1],ai2d_input_size[0]]
self.ai2d.build([input_np.shape[0],input_np.shape[1],input_np.shape[2],input_np.shape[3]],[1,3,self.model_input_size[1],self.model_input_size[0]])
# 自定义后处理,results是模型输出的array列表
def postprocess(self,results):
with ScopedTiming("postprocess",self.debug_mode > 0):
preds = np.argmax(results[0], axis=2).reshape((-1))
output_txt = ""
for i in range(len(preds)):
# 当前识别字符不是字典的最后一个字符并且和前一个字符不重复(去重),加入识别结果字符串
if preds[i] != (len(self.dict_word) - 1) and (not (i > 0 and preds[i - 1] == preds[i])):
output_txt = output_txt + self.dict_word[preds[i]]
return output_txt
# 计算padding参数
def get_padding_param(self,src_size,dst_size):
# 右padding或下padding
dst_w = dst_size[0]
dst_h = dst_size[1]
input_width = src_size[0]
input_high = src_size[1]
ratio_w = dst_w / input_width
ratio_h = dst_h / input_high
if ratio_w < ratio_h:
ratio = ratio_w
else:
ratio = ratio_h
new_w = (int)(ratio * input_width)
new_h = (int)(ratio * input_high)
dw = (dst_w - new_w) / 2
dh = (dst_h - new_h) / 2
top = (int)(round(0))
bottom = (int)(round(dh * 2 + 0.1))
left = (int)(round(0))
right = (int)(round(dw * 2 - 0.1))
return top, bottom, left, right
def read_dict(self):
if self.dict_path!="":
with open(dict_path, 'r') as file:
line_one = file.read(100000)
line_list = line_one.split("\r\n")
self.dict_word = {num: char.replace("\r", "").replace("\n", "") for num, char in enumerate(line_list)}
class OCRDetRec:
def __init__(self,
ocr_det_kmodel,
ocr_rec_kmodel,
det_input_size,
rec_input_size,
dict_path,
mask_threshold=0.25,
box_threshold=0.3,
rgb888p_size=[1920,1080],
display_size=[1920,1080],
debug_mode=0):
# OCR检测模型路径
self.ocr_det_kmodel=ocr_det_kmodel
# OCR识别模型路径
self.ocr_rec_kmodel=ocr_rec_kmodel
# OCR检测模型输入分辨率
self.det_input_size=det_input_size
# OCR识别模型输入分辨率
self.rec_input_size=rec_input_size
# 字典路径
self.dict_path=dict_path
# 置信度阈值
self.mask_threshold=mask_threshold
# nms阈值
self.box_threshold=box_threshold
# sensor给到AI的图像分辨率,宽16字节对齐
self.rgb888p_size=[ALIGN_UP(rgb888p_size[0],16),rgb888p_size[1]]
# 视频输出VO分辨率,宽16字节对齐
self.display_size=[ALIGN_UP(display_size[0],16),display_size[1]]
# debug_mode模式
self.debug_mode=debug_mode
self.ocr_det=OCRDetectionApp(self.ocr_det_kmodel,
model_input_size=self.det_input_size,mask_threshold=self.mask_threshold,box_threshold=self.box_threshold,rgb888p_size=self.rgb888p_size,display_size=self.display_size,
debug_mode=0)
self.ocr_rec=OCRRecognitionApp(self.ocr_rec_kmodel,
model_input_size=self.rec_input_size,dict_path=self.dict_path,
rgb888p_size=self.rgb888p_size,display_size=self.display_size)
self.ocr_det.config_preprocess()
# run函数
def run(self,input_np):
# 先进行OCR检测
det_res=self.ocr_det.run(input_np)
boxes=[]
ocr_res=[]
for det in det_res:
# 对得到的每个检测框执行OCR识别
self.ocr_rec.config_preprocess(input_image_size=[det[0].shape[2],det[0].shape[1]],input_np=det[0])
ocr_str=self.ocr_rec.run(det[0])
ocr_res.append(ocr_str)
boxes.append(det[1])
gc.collect()
return boxes,ocr_res
# 绘制OCR检测识别效果
def draw_result(self,pl,det_res,rec_res):
pl.osd_img.clear()
if det_res:
# 循环绘制所有检测到的框
for j in range(len(det_res)):
# 将原图的坐标点转换成显示的坐标点,循环绘制四条直线,得到一个矩形框
for i in range(4):
x1 = det_res[j][(i * 2)] / self.rgb888p_size[0] * self.display_size[0]
y1 = det_res[j][(i * 2 + 1)] / self.rgb888p_size[1] * self.display_size[1]
x2 = det_res[j][((i + 1) * 2) % 8] / self.rgb888p_size[0] * self.display_size[0]
y2 = det_res[j][((i + 1) * 2 + 1) % 8] / self.rgb888p_size[1] * self.display_size[1]
pl.osd_img.draw_line((int(x1), int(y1), int(x2), int(y2)), color=(255, 0, 0, 255),thickness=5)
# 在检测框位置显示识别的文字
pl.osd_img.draw_string_advanced(int(x1),int(y1),32,rec_res[j],color=(0,0,255))
if __name__=="__main__":
UART_TX_PIN = 5
UART_RX_PIN = 6
UART_ID = 2
BAUDRATE = 115200
# 初始化fpioa和uart
fpioa = FPIOA()
fpioa.set_function(UART_TX_PIN, FPIOA.UART2_TXD)
fpioa.set_function(UART_RX_PIN, FPIOA.UART2_RXD)
uart = UART(UART_ID , BAUDRATE)
print(fpioa.help(12))
print(fpioa.help(13))
# 显示模式,可以选择"hdmi"、"lcd3_5"(3.5寸mipi屏)和"lcd2_4"(2.4寸mipi屏)
# 配置显示模式
display="lcd3_5"
if display=="hdmi":
display_mode='hdmi'
display_size=[1920,1080]
elif display=="lcd3_5":
display_mode= 'st7701'
display_size=[800,480]
elif display=="lcd2_4":
display_mode= 'st7701'
display_size=[640,480]
rgb888p_size=[640,360] #特殊尺寸定义
# OCR检测模型路径
ocr_det_kmodel_path="/sdcard/examples/kmodel/ocr_det_int16.kmodel"
# OCR识别模型路径
ocr_rec_kmodel_path="/sdcard/examples/kmodel/ocr_rec_int16.kmodel"
# 其他参数
dict_path="/sdcard/examples/utils/dict.txt"
# 系统参数 不可改
ocr_det_input_size=[640,640]
ocr_rec_input_size=[512,32]
# 可改
mask_threshold=0.25
box_threshold=0.3
# 初始化PipeLine,只关注传给AI的图像分辨率,显示的分辨率
pl=PipeLine(rgb888p_size=rgb888p_size,
display_size=display_size,
display_mode=display_mode)
if display =="lcd2_4":
pl.create(Sensor(width=1280, height=960)) # 创建PipeLine实例,画面4:3
else:
pl.create(Sensor(width=1920, height=1080)) # 创建PipeLine实例
ocr=OCRDetRec(ocr_det_kmodel_path,
ocr_rec_kmodel_path,
det_input_size=ocr_det_input_size,rec_input_size=ocr_rec_input_size,
dict_path=dict_path,
mask_threshold=mask_threshold,
box_threshold=box_threshold,
rgb888p_size=rgb888p_size,
display_size=display_size)
clock = time.clock()
# 发送数据的细节处理函数
def send_ocr_data(text, x, y):
try:
# 数据清洗:去掉文字里可能存在的逗号或换行,防止破坏协议
clean_text = text.replace(',', '.').replace('\n', '').strip() # 针对字符串类型数据
# 协议格式: @文字,X,Y#
packet = "@{},{},{}#".format(clean_text, x, y)
# 编码并发送
uart.write(packet.encode('utf-8'))
print(f"[UART发送] {packet}") # 调试用
except Exception as e:
print(f"发送失败: {e}")
# 发送时间
last_send_time = 0
SEND_INTERVAL_MS = 100 # 发送间隔100ms 每秒最多10张
while True:
# clock.tick()
os.exitpoint() # 防卡死机制
img=pl.get_frame() # 获取当前帧
boxes,rec_res=ocr.run(img) # 推理当前帧
if boxes:
ocr.draw_result(pl,boxes,rec_res) # 绘制当前帧推理结果
print(boxes,rec_res) # 打印结果
pl.show_image() # 展示当前帧推理结果
current_time = time.ticks_ms()
can_send = (time.ticks_diff(current_time, last_send_time) >= SEND_INTERVAL_MS)
if can_send:
# target = det_res[0] # 只发送第一个检测框
raw_text = rec_res[0] # 文本
rect = boxes[0] # 检测框四个角的坐标
# 计算检测框中心点坐标
center_x = (rect[0] + rect[2] + rect[4] + rect[6]) / 4
center_y = (rect[1] + rect[3] + rect[5] + rect[7]) / 4
# 2. 新增:转换为 STM32 的 240x320 坐标
# 原图宽 640 -> 目标宽 240
scaled_x = int((center_x / 640) * 1920)
# 原图高 360 -> 目标高 320
scaled_y = int((center_y / 360) * 1080)
# 发送检测框中心点坐标
send_ocr_data(raw_text, scaled_x, scaled_y)
last_send_time = current_time
gc.collect()
|