#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : # @Author : # @File : Image_Registration_test.py import time import cv2 import numpy as np from ultralytics import YOLO # 添加YOLOv8模型初始化 yolo_model = YOLO("yolov8n.pt") # 可替换为yolov8s/m/l等 yolo_model.to('cuda') # 启用GPU加速 def sift_registration(img1, img2): img1gray = cv2.normalize(img1, dst=None, alpha=0, beta=255, norm_type=cv2.NORM_MINMAX).astype(np.uint8) img2gray = img2 sift = cv2.SIFT_create() # find the keypoints and descriptors with SIFT kp1, des1 = sift.detectAndCompute(img1gray, None) kp2, des2 = sift.detectAndCompute(img2gray, None) # FLANN parameters FLANN_INDEX_KDTREE = 1 index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5) search_params = dict(checks=50) flann = cv2.FlannBasedMatcher(index_params, search_params) matches = flann.knnMatch(des1, des2, k=2) good = [] pts1 = [] pts2 = [] for i, (m, n) in enumerate(matches): if m.distance < 0.75 * n.distance: good.append(m) pts2.append(kp2[m.trainIdx].pt) pts1.append(kp1[m.queryIdx].pt) MIN_MATCH_COUNT = 4 if len(good) > MIN_MATCH_COUNT: src_pts = np.float32([kp1[m.queryIdx].pt for m in good]).reshape(-1, 1, 2) dst_pts = np.float32([kp2[m.trainIdx].pt for m in good]).reshape(-1, 1, 2) M, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0) else: print("Not enough matches are found - {}/{}".format(len(good), MIN_MATCH_COUNT)) M = np.array([[1, 0, 0], [0, 1, 0], [0, 0, 1]], dtype=np.float64) if M is None: M = np.array([[1, 0, 0], [0, 1, 0], [0, 0, 1]], dtype=np.float64) return 1, M[0], len(pts2) # 裁剪线性RGB对比度拉伸:(去掉2%百分位以下的数,去掉98%百分位以上的数,上下百分位数一般相同,并设置输出上下限) def truncated_linear_stretch(image, truncated_value=2, maxout=255, min_out=0): """ :param image: :param truncated_value: :param maxout: :param min_out: :return: """ def gray_process(gray, maxout=maxout, minout=min_out): truncated_down = np.percentile(gray, truncated_value) truncated_up = np.percentile(gray, 100 - truncated_value) gray_new = ((maxout - minout) / (truncated_up - truncated_down)) * gray gray_new[gray_new < minout] = minout gray_new[gray_new > maxout] = maxout return np.uint8(gray_new) (b, g, r) = cv2.split(image) b = gray_process(b) g = gray_process(g) r = gray_process(r) result = cv2.merge((b, g, r)) # 合并每一个通道 return result # RGB图片配准函数,采用白天的可见光与红外灰度图,计算两者Surf共同特征点,之间的仿射矩阵。 def Images_matching(img_base, img_target): """ :param img_base: :param img_target:匹配图像 :return: 返回仿射矩阵 """ start = time.time() orb = cv2.ORB_create() img_base = cv2.cvtColor(img_base, cv2.COLOR_BGR2GRAY) sift = cv2.SIFT_create() # 使用sift算子计算特征点和特征点周围的特征向量 st1 = time.time() kp1, des1 = sift.detectAndCompute(img_base, None) # 1136 1136, 64 kp2, des2 = sift.detectAndCompute(img_target, None) en1 = time.time() # print(en1 - st1, "特征提取") # 进行KNN特征匹配 # FLANN_INDEX_KDTREE = 0 # 建立FLANN匹配器的参数 # indexParams = dict(algorithm=FLANN_INDEX_KDTREE, trees=5) # 配置索引,密度树的数量为5 # searchParams = dict(checks=50) # 指定递归次数 # flann = cv2.FlannBasedMatcher(indexParams, searchParams) # 建立匹配器 # matches = flann.knnMatch(des1, des2, k=2) # 得出匹配的关键点 list: 1136 # FLANN_INDEX_KDTREE = 1 # index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5) # search_params = dict(checks=50) # flann = cv2.FlannBasedMatcher(index_params, search_params) # matches = flann.knnMatch(des1, des2, k=2) st2 = time.time() matcher = cv2.BFMatcher() matches = matcher.knnMatch(des1, des2, k=2) de2 = time.time() # print(de2 - st2, "特征匹配") good = [] # 提取优秀的特征点 for m, n in matches: if m.distance < 0.75 * n.distance: # 如果第一个邻近距离比第二个邻近距离的0.7倍小,则保留 good.append(m) # 134 src_pts = np.array([kp1[m.queryIdx].pt for m in good]) # 查询图像的特征描述子索引 # 134, 2 dst_pts = np.array([kp2[m.trainIdx].pt for m in good]) # 训练(模板)图像的特征描述子索引 if len(src_pts) <= 4: print("Not enough matches are found - {}/{}".format(len(good), 4)) return 0, None, 0 else: # print(len(dst_pts), len(src_pts), "配准坐标点") H = cv2.findHomography(dst_pts, src_pts, cv2.RANSAC, 4) # 生成变换矩阵 H[0]: 3, 3 H[1]: 134, 1 end = time.time() times = end - start # print("配准时间", times) return 1, H[0], len(dst_pts) def fusions(img_vl, img_inf): """ :param img_vl: 原图像 :param img_inf: 红外图像 :return: """ img_YUV = cv2.cvtColor(img_vl, cv2.COLOR_BGR2YUV) # 如果输入是BGR,需转换 # img_YUV = cv2.cvtColor(img_vl, cv2.COLOR_RGB2YUV) y, u, v = cv2.split(img_YUV) # 分离通道,获取Y通道 Yf = y * 0.5 + img_inf * 0.5 Yf = Yf.astype(np.uint8) fusion = cv2.cvtColor(cv2.merge((Yf, u, v)), cv2.COLOR_YUV2RGB) return fusion def removeBlackBorder(gray): """ 移除缝合后的图像的多余黑边 输入: image:三维numpy矩阵,待处理图像 输出: 裁剪后的图像 """ threshold = 40 # 阈值 nrow = gray.shape[0] # 获取图片尺寸 ncol = gray.shape[1] rowc = gray[:, int(1 / 2 * nrow)] # 无法区分黑色区域超过一半的情况 colc = gray[int(1 / 2 * ncol), :] rowflag = np.argwhere(rowc > threshold) colflag = np.argwhere(colc > threshold) left, bottom, right, top = rowflag[0, 0], colflag[-1, 0], rowflag[-1, 0], colflag[0, 0] # cv2.imshow('name', gray[left:right, top:bottom]) # 效果展示 cv2.waitKey(1) return gray[left:right, top:bottom], left, right, top, bottom def main(matchimg_vi, matchimg_in): """ :param matchimg_vi: 可见光图像 :param matchimg_in: 红外图像 :return: 融合好的图像(带检测结果) """ try: orimg_vi = matchimg_vi orimg_in = matchimg_in h, w = orimg_vi.shape[:2] # 480 640 flag, H, dot = Images_matching(matchimg_vi, matchimg_in) # (3, 3)//获取对应的配准坐标点 if flag == 0: return 0, None, 0 else: matched_ni = cv2.warpPerspective(orimg_in, H, (w, h)) # matched_ni,left,right,top,bottom=removeBlackBorder(matched_ni) # fusion = fusions(orimg_vi[left:right, top:bottom], matched_ni) fusion = fusions(orimg_vi, matched_ni) # YOLOv8目标检测 results = yolo_model(fusion) # 输入融合后的图像 annotated_image = results[0].plot() # 绘制检测框 return 1, annotated_image, dot # 返回带检测结果的图像 except Exception as e: print(f"Error in fusion/detection: {e}") return 0, None, 0 if __name__ == '__main__': time_all = 0 dots = 0 i = 0 # fourcc = cv2.VideoWriter_fourcc(*'XVID') # capture = cv2.VideoCapture("video/20190926_141816_1_8/20190926_141816_1_8/infrared.mp4") # capture2 = cv2.VideoCapture("video/20190926_141816_1_8/20190926_141816_1_8/visible.mp4") # fps = capture.get(cv2.CAP_PROP_FPS) # out = cv2.VideoWriter('output2.mp4', fourcc, fps, (640, 480)) # # 持续读取摄像头数据 # while True: # read_code, frame = capture.read() # 红外帧 # read_code2, frame2 = capture2.read() # 可见光帧 # if not read_code: # break # i += 1 # # frame = cv2.resize(frame, (1920, 1080)) # # frame2 = cv2.resize(frame2, (640, 512)) # # # 转换为灰度图(红外图像处理) # frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # # # 调用main函数进行融合和检测 # flag, fusion, dot = main(frame2, frame_gray) # # if flag == 1: # # 显示带检测结果的融合图像 # cv2.imshow("Fusion with YOLOv8 Detection", fusion) # out.write(fusion) # # if cv2.waitKey(1) == ord('q'): # break # # 释放资源 # capture.release() # capture2.release() # cv2.destroyAllWindows() # ave = time_all / i # print(ave, "平均时间") # cv2.destroyAllWindows() # === 新增静态图片测试代码 === # 输入可见光和红外图像路径 visible_path = "../test_images/visible.jpg" # 可见光图片路径 infrared_path = "../test_images/infrared.jpg" # 红外图片路径 # 读取图像 img_visible = cv2.imread(visible_path) img_infrared = cv2.imread(infrared_path) if img_visible is None or img_infrared is None: print("Error: 图片加载失败,请检查路径!") exit() # 转换为灰度图(红外图像处理) img_inf_gray = cv2.cvtColor(img_infrared, cv2.COLOR_BGR2GRAY) # 执行融合与检测 flag, fusion_result, _ = main(img_visible, img_inf_gray) if flag == 1: # 显示并保存结果 cv2.imshow("Fusion with Detection", fusion_result) cv2.imwrite("../output/fusion_result.jpg", fusion_result) cv2.waitKey(0) cv2.destroyAllWindows() else: print("融合失败!")