0
0
Lập trình
Sơn Tùng Lê
Sơn Tùng Lê103931498422911686980

Hướng dẫn xây dựng script Node.js để phân tích dữ liệu Eelink MQTT

Đăng vào 6 tháng trước

• 6 phút đọc

Hướng dẫn xây dựng script Node.js để phân tích dữ liệu Eelink MQTT

Giới thiệu

Trong bài viết này, chúng ta sẽ cùng nhau xây dựng một script Node.js để phân tích dữ liệu từ các thiết bị IoT sử dụng giao thức MQTT, đặc biệt là dữ liệu từ các thiết bị Eelink. Script này sẽ tự động phát hiện các kiểu mã hóa dữ liệu như JSON và Hex/TLV, sau đó chuẩn hóa chúng thành một định dạng JSON duy nhất để dễ dàng lưu trữ, theo dõi và phân tích. Đây là một công cụ hữu ích cho các kỹ sư IoT, kỹ sư dữ liệu và bất kỳ ai muốn biến dữ liệu thô từ thiết bị thành các định dạng sẵn sàng cho phân tích.

Mục lục

  1. Yêu cầu và cài đặt
  2. Ví dụ về chủ đề và payload
  3. Chiến lược phân tích
  4. Script hoàn chỉnh
  5. Chạy script và ví dụ đầu ra
  6. Cạm bẫy và bước tiếp theo

1) Yêu cầu và cài đặt

Trước khi bắt đầu, hãy đảm bảo rằng bạn đã cài đặt Node.js phiên bản 18 trở lên và có quyền truy cập vào một broker MQTT mà bạn có thể sử dụng (có thể là local hoặc hosted). Tiến hành cài đặt các dependencies cần thiết bằng cách chạy:

bash Copy
npm init -y
npm install mqtt dotenv yargs

Tiếp theo, tạo một tệp .env với nội dung như sau:

bash Copy
MQTT_URL=mqtts://broker.example.com:8883
MQTT_USERNAME=your-username
MQTT_PASSWORD=your-password
MQTT_TOPIC=devices/+/up

Chủ đề devices/+/up sẽ đăng ký tất cả các thiết bị (dấu + đại diện cho một cấp độ). Bạn có thể điều chỉnh theo sơ đồ của bạn.

2) Ví dụ về chủ đề và payload

Dưới đây là hai ví dụ về payload mà bạn có thể nhận từ thiết bị:

Payload JSON (thông thường)

json Copy
{
  "deviceId": "ELK-123456",
  "ts": "2025-09-08T06:01:22Z",
  "gnss": { "lat": 22.543096, "lon": 114.057865, "speedKph": 48.3, "heading": 180 },
  "batteryPct": 87,
  "status": ["IGN_ON", "MOVING"]
}

Payload Hex/TLV (ví dụ)

text Copy
# TLV sử dụng Tag(1B) + Len(1B) + Value(N) — các trường ví dụ:
01 07 454C4B2D313233343536   # tag=0x01 deviceId ASCII("ELK-123456")
02 04 66 A2 9C 16            # tag=0x02 ts epoch (uint32 BE)
03 04 01 35 E0 F0            # tag=0x03 lat_i32 /1e6
04 04 04 4D A4 A1            # tag=0x04 lon_i32 /1e6
05 02 01 E5                  # tag=0x05 speed 0.1 kph
06 01 57                     # tag=0x06 battery pct

3) Chiến lược phân tích

Để chuẩn hóa các payload này, chúng ta sẽ phát hiện loại dữ liệu đầu vào và chuyển đổi chúng thành một định dạng nhất quán. Script sẽ:

  • Phát hiện JSON: Các payload bắt đầu bằng { hoặc [ sẽ được phân tích như JSON.
  • Phát hiện Base64: Các chuỗi có thể giải mã thành JSON hoặc TLV hợp lệ khi chạy qua Buffer.from(s, 'base64').
  • Phát hiện Hex/TLV: Các chuỗi hex thuần với chiều dài chẵn sẽ được phân tích như TLV; các tag định nghĩa các trường như deviceId, timestamp, tọa độ, tốc độ, pin, v.v.
  • Fallback: Nếu không có trường hợp nào trên áp dụng, coi thông điệp như văn bản thuần và bao gồm nó như _raw để kiểm tra.

Định dạng đầu ra trông như sau:

json Copy
{
  "sourceTopic": "string",
  "rawFormat": "json" | "base64|json" | "base64|tlv" | "hex|tlv" | "text",
  "deviceId"?: "string",
  "ts"?: "string",
  "lat"?: "number",
  "lon"?: "number",
  "speedKph"?: "number",
  "heading"?: "number",
  "batteryPct"?: "number",
  "statusFlags"?: { "ignOn"?: "boolean", "moving"?: "boolean", "tamper"?: "boolean" },
  "_raw"?: "string"
}

4) Script hoàn chỉnh

Dưới đây là script Node.js hoàn chỉnh (parse-eelink-mqtt.js). Nó sẽ đăng ký vào chủ đề, giải mã từng payload, chuẩn hóa các trường và viết kết quả ra stdout hoặc tệp NDJSON. Hãy thay thế bộ phân tích TLV với giao thức chính thức của bạn trong môi trường sản xuất.

javascript Copy
#!/usr/bin/env node
/* eslint-disable no-console */

const mqtt = require('mqtt');
const fs = require('fs');
const path = require('path');
const { hideBin } = require('yargs/helpers');
const yargs = require('yargs/yargs');
require('dotenv').config();

const argv = yargs(hideBin(process.argv))
  .option('broker', { type: 'string', default: process.env.MQTT_URL })
  .option('username', { type: 'string', default: process.env.MQTT_USERNAME })
  .option('password', { type: 'string', default: process.env.MQTT_PASSWORD })
  .option('topic', { type: 'string', default: process.env.MQTT_TOPIC || 'devices/+/up' })
  .option('save', { type: 'string' })
  .option('insecure', { type: 'boolean', default: false })
  .help().argv;

const outStream = argv.save ? fs.createWriteStream(path.resolve(argv.save), { flags: 'a' }) : null;

const client = mqtt.connect(argv.broker, {
  username: argv.username,
  password: argv.password,
  clientId: `eelink-parser-${Math.random().toString(16).slice(2)}`,
  rejectUnauthorized: !argv.insecure,
});

client.on('connect', () => {
  console.log('[MQTT] connected', argv.broker);
  client.subscribe(argv.topic, { qos: 1 }, (err) => {
    if (err) console.error('subscribe error:', err.message);
    else console.log('[MQTT] subscribed:', argv.topic);
  });
});

client.on('message', (topic, payloadBuf) => {
  try {
    const rec = validateAndClean(decodePayload(topic, payloadBuf));
    const line = JSON.stringify(rec);
    console.log(line);
    if (outStream) outStream.write(line + '\n');
  } catch (e) {
    console.error('decode error:', e.message);
  }
});

5) Chạy script và ví dụ đầu ra

Bạn có thể chạy script này bằng cách sử dụng tệp .env hoặc truyền các tham số CLI. Ví dụ:

bash Copy
# sử dụng .env
node parse-eelink-mqtt.js --save out.ndjson

# chỉ định tất cả từ dòng lệnh
node parse-eelink-mqtt.js \
  --broker mqtts://broker.example.com:8883 \
  --username alice --password secret \
  --topic devices/+/up \
  --save out.ndjson --insecure

Khi các thông điệp đến, script sẽ in một bản ghi JSON đã chuẩn hóa cho mỗi thông điệp. Đây là một ví dụ:

json Copy
{"sourceTopic":"devices/ELK-123456/up","rawFormat":"json","deviceId":"ELK-123456","ts":"2025-09-08T06:01:22.000Z","lat":22.543096,"lon":114.057865,"speedKph":48.3,"heading":180,"batteryPct":87,"statusFlags":{"ignOn":true,"moving":true,"tamper":false}}
{"sourceTopic":"devices/ELK-654321/up","rawFormat":"hex|tlv","deviceId":"ELK-654321","ts":"2025-09-08T06:02:10.000Z","lat":22.54301,"lon":114.0579,"speedKph":50.2,"batteryPct":83}

Mỗi dòng đều là JSON hợp lệ, vì vậy bạn có thể trực tiếp đưa NDJSON vào các công cụ phân tích, hệ thống message broker hoặc cơ sở dữ liệu.

6) Cạm bẫy và bước tiếp theo

  • Sử dụng --insecure chỉ để thử nghiệm; trong môi trường sản xuất, bạn nên luôn xác thực chứng chỉ hoặc sử dụng CA đáng tin cậy.
  • Giảm thiểu các thông điệp dựa trên deviceIdts khi ghi vào cơ sở dữ liệu để tránh chèn trùng lặp.
  • Để có thông lượng cao hơn, tránh ghi đồng bộ bên trong bộ xử lý thông điệp; sử dụng hàng đợi hoặc ghi theo lô.
  • Thay thế bộ giải mã TLV ví dụ bằng thông số kỹ thuật giao thức chính thức của thiết bị bạn.

Việc chuẩn hóa dữ liệu uplink tại edge cho phép bạn nhanh chóng tiếp nhận và theo dõi dữ liệu Eelink trong các pipeline dữ liệu của bạn.

Thực tiễn tốt nhất

  • Kiểm tra kỹ lưỡng: Hãy chắc chắn rằng bạn đã kiểm tra các payload khác nhau từ thiết bị của bạn để đảm bảo rằng tất cả đều được xử lý chính xác.
  • Tối ưu hóa hiệu suất: Nếu bạn có khối lượng lớn dữ liệu, hãy xem xét sử dụng batch processing để giảm thiểu thời gian ghi vào cơ sở dữ liệu.

Câu hỏi thường gặp

1. Làm thế nào để cài đặt Node.js?

Bạn có thể tải xuống Node.js từ trang web chính thức nodejs.org và làm theo hướng dẫn cài đặt cho hệ điều hành của bạn.

2. Tôi có thể sử dụng broker MQTT nào?

Có nhiều dịch vụ broker MQTT mà bạn có thể sử dụng, bao gồm Mosquitto, HiveMQ, hoặc dịch vụ đám mây như AWS IoT.

3. Script có hoạt động với các loại dữ liệu khác không?

Script này được thiết kế cho dữ liệu Eelink, nhưng bạn có thể điều chỉnh nó để xử lý các loại dữ liệu khác bằng cách thay đổi các bộ phân tích tương ứng.

Gợi ý câu hỏi phỏng vấn
Không có dữ liệu

Không có dữ liệu

Bài viết được đề xuất
Bài viết cùng tác giả

Bình luận

Chưa có bình luận nào

Chưa có bình luận nào