Edge SDK -- Live Data Service

The LiveDataService interface manages persistent gRPC telemetry streams between the edge adapter and the platform's Live Data Service. It provides both a POJO-based API (recommended for most use cases) and a raw Proto-based API for advanced scenarios.

Table of Contents


Overview

Live telemetry data is the real-time heartbeat of every asset in the Zequent ecosystem. Edge adapters continuously push telemetry -- position, battery, environmental readings, camera state, and more -- to the Live Data Service, where it is broadcast to Client SDK consumers and stored for historical analysis.

The LiveDataService abstracts the complexity of managing gRPC streams. Internally, it creates one BroadcastProcessor per device serial number and subscribes it to the Live Data Service gRPC stub. Streams reconnect automatically on failure with exponential backoff.


How It Works

Your Adapter Code
      |
      v
LiveDataService.produceTelemetryData(TelemetryRequestData)
      |
      v
TelemetryMapper (POJO --> Proto)
      |
      v
BroadcastProcessor (one per device)
      |
      v
gRPC stream --> Live Data Service (platform)

Key characteristics of the implementation:

  • One stream per device. A BroadcastProcessor is created for each unique serial number and reused for all subsequent telemetry pushes.
  • Automatic reconnection. If the gRPC stream fails, it retries with exponential backoff (1s to 30s, 20% jitter, up to 10 attempts). If all retries are exhausted, it schedules a reconnect after 30 seconds.
  • Graceful shutdown. All streams are closed on application shutdown via the @PreDestroy lifecycle callback.
  • Thread-safe. Device-to-processor mappings are stored in a ConcurrentHashMap, and shutdown state is tracked with an AtomicBoolean.

1. Build the Telemetry Data Object

Map data from your device into the TelemetryRequestData POJO:

import com.zequent.framework.edge.sdk.models.TelemetryRequestData;
import com.zequent.framework.common.proto.LiveDataType;
import com.zequent.framework.utils.edge.sdk.dto.AssetTelemetryData;
import java.time.LocalDateTime;

TelemetryRequestData data = TelemetryRequestData.builder()
    .sn("YOUR_DEVICE_SN")
    .tid(UUID.randomUUID().toString())
    .assetId("asset-uuid")
    .timestamp(LocalDateTime.now())
    .type(LiveDataType.ASSET_TELEMETRY)
    .assetTelemetry(assetTelemetry)  // see models section below
    .build();

2. Send It

Reactive (Mutiny):

import com.zequent.framework.edge.sdk.interfaces.LiveDataService;

private final LiveDataService liveDataService;

public void sendTelemetry(TelemetryRequestData data) {
    liveDataService.produceTelemetryData(data)
        .subscribe().with(
            ignored -> log.debug("Telemetry sent for {}", data.getSn()),
            err -> log.error("Error sending telemetry", err)
        );
}

Future-based (CompletionStage):

public void sendTelemetry(TelemetryRequestData data) {
    liveDataService.produceTelemetryData(data)
        .subscribeAsCompletionStage()
        .thenRun(() -> log.debug("Telemetry sent"))
        .exceptionally(err -> {
            log.error("Error sending telemetry", err);
            return null;
        });
}

Proto-based API (Advanced)

For scenarios where you need full control over the Proto message (e.g., you already have telemetry data in Proto format from another system), use the direct Proto API:

import com.zequent.framework.services.livedata.proto.ProduceTelemetryRequest;

ProduceTelemetryRequest protoRequest = ProduceTelemetryRequest.newBuilder()
    .setBase(RequestBase.newBuilder()
        .setSn("YOUR_DEVICE_SN")
        .setTid(UUID.randomUUID().toString())
        .setTimestamp(ProtobufHelpers.now())
        .build())
    // ... set telemetry fields
    .build();

liveDataService.produceTelemetry("YOUR_DEVICE_SN", protoRequest)
    .subscribe().with(
        ignored -> log.debug("Proto telemetry sent"),
        err -> log.error("Error", err)
    );

Both APIs share the same underlying stream infrastructure, so there is no performance difference.


Stream Management

Close a Single Device Stream

liveDataService.closeStream("YOUR_DEVICE_SN")
    .subscribe().with(
        ignored -> log.info("Stream closed for device"),
        err -> log.error("Error closing stream", err)
    );

Close All Streams

Typically called during shutdown, but can be triggered manually:

liveDataService.closeAllStreams()
    .subscribe().with(
        ignored -> log.info("All streams closed"),
        err -> log.error("Error closing streams", err)
    );

The LiveDataServiceImpl also registers a @PreDestroy callback that automatically closes all streams with a 10-second timeout on application shutdown.


Telemetry Data Models

TelemetryRequestData

The top-level wrapper that carries one telemetry payload:

FieldTypeDescription
tidStringTransaction identifier for tracing
snStringDevice serial number
assetIdStringPlatform asset identifier
timestampLocalDateTimeWhen the telemetry was recorded
typeLiveDataTypeASSET_TELEMETRY or SUBASSET_TELEMETRY
assetTelemetryAssetTelemetryDataAsset-level telemetry (set when type is ASSET_TELEMETRY)
subAssetTelemetrySubAssetTelemetryDataSub-asset-level telemetry (set when type is SUBASSET_TELEMETRY)

AssetTelemetryData

Telemetry data for the primary asset (e.g., a dock or ground station):

FieldTypeDescription
idStringAsset telemetry identifier
timestampLocalDateTimeMeasurement timestamp
latitudeFloatGPS latitude
longitudeFloatGPS longitude
absoluteAltitudeFloatAltitude above sea level (meters)
relativeAltitudeFloatAltitude above takeoff point (meters)
environmentTempFloatAmbient temperature (Celsius)
insideTempFloatInternal temperature (Celsius)
humidityFloatHumidity percentage
modeAssetModesCurrent operational mode
rainfallRainfallEnumRainfall condition
headingFloatHeading in degrees
debugModeOpenBooleanWhether debug mode is active
hasActiveManualControlSessionBooleanWhether manual control is active
coverStateAssetCoverStateEnumDock cover state (open/closed/moving)
workingVoltageIntegerSystem voltage (mV)
workingCurrentIntegerSystem current (mA)
supplyVoltageIntegerSupply voltage (mV)
windSpeedFloatWind speed (m/s)
positionValidBooleanWhether GPS position is valid
manualControlStateManualControlStateEnumDRC manual control state
subAssetInformationSubAssetInformationPaired sub-asset info (sn, model, paired, online)
subAssetAtHomeBooleanWhether the sub-asset is at the dock
subAssetChargingBooleanWhether the sub-asset is charging
subAssetPercentageFloatSub-asset battery percentage
networkInformationNetworkInformationNetwork type, rate, quality
airConditionerAirConditionerAir conditioner state and switch time

SubAssetTelemetryData

Telemetry data for a sub-asset (e.g., a drone):

FieldTypeDescription
idStringSub-asset telemetry identifier
timestampLocalDateTimeMeasurement timestamp
latitudeFloatGPS latitude
longitudeFloatGPS longitude
absoluteAltitudeFloatAltitude above sea level (meters)
relativeAltitudeFloatAltitude above takeoff point (meters)
horizontalSpeedFloatHorizontal speed (m/s)
verticalSpeedFloatVertical speed (m/s)
windSpeedFloatWind speed (m/s)
windDirectionStringWind direction
headingFloatHeading in degrees
gearIntegerLanding gear state
heightLimitIntegerMaximum height limit (meters)
homeDistanceFloatDistance to home point (meters)
totalMovementDistanceDoubleTotal distance traveled (meters)
totalMovementTimeDoubleTotal flight time (seconds)
modeSubAssetModeCurrent flight mode
countryStringCountry code
batteryInformationBatteryInformationBattery percentage, remaining time, RTH power
payloadTelemetryPayloadTelemetryCamera, rangefinder, and sensor data

PayloadTelemetry

FieldTypeDescription
idStringPayload identifier
timestampLocalDateTimeMeasurement timestamp
nameStringPayload name
cameraDataCameraDataCurrent lens, gimbal pitch/yaw, zoom factor
rangeFinderDataRangeFinderDataTarget lat/lon/distance/altitude
sensorDataSensorDataTarget temperature

Configuration

The Live Data Service gRPC client is configured in application.properties:

quarkus.grpc.clients.live-data-service.host=localhost
quarkus.grpc.clients.live-data-service.port=8003
quarkus.grpc.clients.live-data-service.keep-alive-without-calls=true

For container deployments:

quarkus.grpc.clients.live-data-service.host=live-data-service
quarkus.grpc.clients.live-data-service.port=8003

See the Configuration Guide for the complete reference.


Best Practices

  1. Send telemetry at a reasonable frequency. Sending too fast can overwhelm the gRPC stream and the platform. For most assets, 1-5 Hz is appropriate. For OSD (on-screen display) data from drones, the typical rate is 2 Hz.

  2. Use the POJO API unless you have a specific reason not to. The TelemetryMapper handles all Proto conversions for you, including timestamp mapping between LocalDateTime and Protobuf Timestamp.

  3. Set the correct LiveDataType. Use ASSET_TELEMETRY for dock/station data and SUBASSET_TELEMETRY for drone/vehicle data. This determines how the data is routed and stored on the platform.

  4. Include a transaction ID. Setting tid on every telemetry message enables end-to-end tracing across the system.

  5. Do not manually manage streams. Let the SDK handle stream creation, reconnection, and teardown. If you need to reset a stream, call closeStream(deviceSn) and the next produceTelemetry call will create a new one automatically.

  6. Handle the shutdown gracefully. If your adapter has its own shutdown logic, call closeAllStreams() before tearing down other resources. The SDK does this automatically via @PreDestroy, but explicit ordering can prevent race conditions.

Was this page helpful?