mqtt修改

This commit is contained in:
2026-04-16 17:49:48 +08:00
parent c6f6226c02
commit db9d672c20
2 changed files with 210 additions and 338 deletions

View File

@@ -7,6 +7,8 @@ import android.os.Looper;
import android.app.Service;
import android.content.Intent;
import android.os.IBinder;
import android.os.PowerManager;
import android.net.wifi.WifiManager;
import android.util.Log;
import android.os.Build;
import android.app.NotificationChannel;
@@ -109,10 +111,21 @@ public class Main extends Service {
// ================= MQTT 服务 =================
private MqttClient mqttClient;
private static final String MQTT_BROKER_URL = "tcp://192.168.0.105:1883";
private static final String MQTT_BROKER_URL = "ws://emqx.pineappletech.cn";
private static final String MQTT_CLIENT_ID_PREFIX = "pico_";
private ScheduledExecutorService mqttScheduler;
private static final long MQTT_REPORT_INTERVAL = 10; // 10秒上报一次
private static final long MQTT_REPORT_INTERVAL = 20; // 20秒上报一次
private static final int MQTT_KEEP_ALIVE = 60; // KeepAlive 60秒
private static final int MQTT_CONNECTION_TIMEOUT = 30; // 连接超时30秒
private volatile boolean isMqttReconnecting = false;
private int mqttReconnectAttempts = 0;
private static final int MAX_RECONNECT_ATTEMPTS = 10;
private static final long RECONNECT_DELAY_MS = 5000; // 重连延迟5秒
// ================= 唤醒锁 =================
private PowerManager.WakeLock wakeLock;
private WifiManager.WifiLock wifiLock;
private static final String WAKE_LOCK_TAG = "PineappleService::WakeLock";
// ================= 服务生命周期 =================
@Override
@@ -178,12 +191,56 @@ public class Main extends Service {
// 启动Http服务
startHttpReceiver();
// 初始化唤醒锁
initWakeLocks();
// 启动MQTT服务
initMqttService();
// return START_STICKY;
}
/**
* 初始化唤醒锁,防止设备进入 Doze 模式导致 MQTT 断线
*/
private void initWakeLocks() {
try {
// 获取 PowerManager 并创建 WakeLock
PowerManager powerManager = (PowerManager) getSystemService(Context.POWER_SERVICE);
wakeLock = powerManager.newWakeLock(PowerManager.PARTIAL_WAKE_LOCK, WAKE_LOCK_TAG);
wakeLock.setReferenceCounted(false);
wakeLock.acquire();
Log.i(TAG, "唤醒锁已获取");
// 获取 WifiManager 并创建 WifiLock
WifiManager wifiManager = (WifiManager) getApplicationContext().getSystemService(Context.WIFI_SERVICE);
wifiLock = wifiManager.createWifiLock(WifiManager.WIFI_MODE_FULL_HIGH_PERF, WAKE_LOCK_TAG);
wifiLock.setReferenceCounted(false);
wifiLock.acquire();
Log.i(TAG, "WiFi 锁已获取");
} catch (Exception e) {
Log.e(TAG, "初始化唤醒锁失败: " + e.getMessage());
}
}
/**
* 释放唤醒锁
*/
private void releaseWakeLocks() {
try {
if (wakeLock != null && wakeLock.isHeld()) {
wakeLock.release();
Log.i(TAG, "唤醒锁已释放");
}
if (wifiLock != null && wifiLock.isHeld()) {
wifiLock.release();
Log.i(TAG, "WiFi 锁已释放");
}
} catch (Exception e) {
Log.e(TAG, "释放唤醒锁失败: " + e.getMessage());
}
}
@Override
public void onDestroy() {
super.onDestroy();
@@ -217,6 +274,9 @@ public class Main extends Service {
// 断开MQTT连接
disconnectMqtt();
// 释放唤醒锁
releaseWakeLocks();
}
private String getIpAddressFromPico() {
@@ -791,14 +851,18 @@ public class Main extends Service {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
options.setConnectionTimeout(MQTT_CONNECTION_TIMEOUT);
options.setKeepAliveInterval(MQTT_KEEP_ALIVE);
options.setAutomaticReconnect(true);
// 设置最大重连间隔为60秒
options.setMaxReconnectDelay(60000);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
Log.e(TAG, "MQTT 连接丢失: " + cause.getMessage());
// 触发重连
scheduleMqttReconnect();
}
@Override
@@ -814,12 +878,60 @@ public class Main extends Service {
mqttClient.connect(options);
Log.i(TAG, "MQTT 连接成功 - ClientID: " + clientId);
// 重置重连计数
mqttReconnectAttempts = 0;
isMqttReconnecting = false;
// 启动定时上报任务
startMqttReportTask();
} catch (MqttException e) {
Log.e(TAG, "MQTT 连接失败: " + e.getMessage());
// 连接失败也触发重连
scheduleMqttReconnect();
}
});
}
/**
* 调度 MQTT 重连
*/
private void scheduleMqttReconnect() {
if (isMqttReconnecting) {
Log.d(TAG, "MQTT 重连已在进行中,跳过");
return;
}
if (mqttReconnectAttempts >= MAX_RECONNECT_ATTEMPTS) {
Log.e(TAG, "MQTT 重连次数超过最大限制,停止重连");
return;
}
isMqttReconnecting = true;
mqttReconnectAttempts++;
long delay = Math.min(RECONNECT_DELAY_MS * mqttReconnectAttempts, 60000); // 最大延迟60秒
Log.i(TAG, "MQTT 将在 " + delay + "ms 后进行第 " + mqttReconnectAttempts + " 次重连");
networkExecutor.execute(() -> {
try {
Thread.sleep(delay);
// 清理旧连接
if (mqttClient != null) {
try {
mqttClient.disconnectForcibly();
mqttClient.close();
} catch (Exception e) {
// 忽略清理错误
}
mqttClient = null;
}
// 重新连接
isMqttReconnecting = false;
connectToMqttBroker();
} catch (InterruptedException e) {
Log.e(TAG, "MQTT 重连等待被中断: " + e.getMessage());
isMqttReconnecting = false;
}
});
}