Files
box-secondary-development-3/快速接收设备告警/custom-api-demo/api_demo_tcp.py

69 lines
2.1 KiB
Python
Raw Normal View History

import json
import socket
# 导入日志
from logger import LOGGER
class Api:
"""
API类实现最简单的tcp通讯如果需要自动重连队列等业务代码请自行实现
"""
def __init__(self):
"""
Attributes:
self.ignore_result: 为True时不发送检测结果为False则发送检测结果
self.ignore_alert: 为True时不发送告警信息为False则发送告警信息
self.draw_image: 为True时告警图片会画上告警信息为False则不画
self.ignore_alert_video: 为True时不发送告警视频为False则发送
"""
self.ignore_result = True
self.ignore_alert = False
self.draw_image = True
self.ignore_alert_video = True
def send_result_callback(self, result):
"""
发送检测结果回调函数
Args:
result: 检测结果数据
Returns:
"""
pass
def send_alert_callback(self, alert):
"""
发送告警信息回调函数
Args:
alert: 告警数据
Returns:
"""
try:
LOGGER.info('发送TCP告警')
# 创建TCP socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# 连接到目标服务器(这里用示例地址和端口)
s.connect(('192.168.0.4', 10001))
# 为了方便查看日志去掉base64编码图片
alert.pop('image')
json_data = json.dumps(alert, ensure_ascii=False)
data = json_data.encode('utf-8')
# 发送完整数据
s.sendall(data)
LOGGER.info(f'告警已发送至TCP服务器: {data}')
except Exception as e:
LOGGER.error(f'发送TCP告警失败: {str(e)}')
def send_alert_video_callback(self, alert_video):
"""
发送告警视频回调函数
Args:
alert_video: 告警视频数据
Returns:
"""
pass