Real-time and Reactive Patterns with Netflix DGS

Part 4 of 7 in the “Production GraphQL with Netflix DGS” series
REST APIs are request-response: the client asks, the server answers, the connection closes. GraphQL subscriptions break that pattern — the server pushes updates to the client as they happen. This article covers how DGS implements subscriptions, how reactive types work throughout the framework, and how to observe it all in production.
GraphQL Subscriptions
Subscriptions let clients receive a stream of updates over a persistent connection. Common use cases: live scoreboards, progress tracking, notifications, collaborative editing.
Schema
Subscriptions are defined alongside queries and mutations:
type Subscription {
orderStatusUpdates(orderId: ID!): OrderStatusUpdate
inventoryAlerts(warehouseId: ID!): InventoryAlert
}
type OrderStatusUpdate {
orderId: ID!
status: OrderStatus!
timestamp: DateTime!
message: String
}Server Implementation
A DGS subscription returns a Publisher<T> — typically a Reactor Flux:
@DgsComponent
@RequiredArgsConstructor
public class OrderSubscriptionController {
private final OrderEventPublisher orderEventPublisher;
private final CurrentUserProvider currentUser;
@DgsSubscription
public Publisher<OrderStatusUpdate> orderStatusUpdates(
@InputArgument String orderId) {
return Flux.from(orderEventPublisher.subscribe(orderId))
.map(event -> OrderStatusUpdate.newBuilder()
.orderId(event.orderId())
.status(mapStatus(event.status()))
.timestamp(event.timestamp())
.message(event.message())
.build());
}
}The flow:
Transport: It Just Works
Since DGS 10.0, subscription transport is handled entirely by Spring for GraphQL. You don’t need custom WebSocket handlers — the graphql-dgs-spring-graphql-starter dependency includes native support for the graphql-transport-ws protocol.
The only infrastructure you need is a WebSocketService bean for WebFlux applications:
@Configuration
public class WebSocketConfig {
@Bean
public WebSocketService webSocketService() {
return new HandshakeWebSocketService(
new ReactorNettyRequestUpgradeStrategy());
}
@Bean
public WebSocketHandlerAdapter webSocketHandlerAdapter(WebSocketService service) {
return new WebSocketHandlerAdapter(service);
}
}Subscriptions are served at the same /graphql endpoint as queries and mutations — the protocol upgrade is handled automatically.
Earlier DGS versions required custom WebSocket handlers and a dedicated /subscriptions endpoint. If you’re migrating from pre-10.0, you can remove those — they’ll actually interfere with the native implementation.
Authentication in Subscriptions
Subscriptions run in a different execution context than queries. The initial HTTP request carries auth headers, but subsequent messages over the WebSocket may not. Handle this at subscription setup time:
@DgsSubscription
public Publisher<ScoreUpdate> liveScores(@InputArgument String boardId) {
// Get authentication during the initial subscription request
return currentUser.getUserId()
.flatMapMany(userId -> {
log.info("Subscription started",
kv("boardId", boardId), kv("userId", userId));
return eventPublisher.subscribe(boardId)
.map(this::mapToGraphQL)
.doOnComplete(() -> log.info("Subscription completed",
kv("boardId", boardId), kv("userId", userId)));
});
}For public subscriptions (like a live sports scoreboard or public event feed), authentication is optional:
@DgsSubscription
public Publisher<ScoreUpdate> publicScores(@InputArgument String boardId) {
// No @PreAuthorize — this is intentionally public
return eventPublisher.subscribe(boardId)
.map(this::mapToGraphQL);
}The Reactive Type Strategy
DGS data fetchers need to return asynchronous results. Two options: CompletableFuture<T> and Mono<T>. Both work, and each has trade-offs.
data fetcher do?"} -->|Uses DataLoaders| CF["Return CompletableFuture<T>"] Q -->|Reactive repos, no DataLoaders| M["Return Mono<T>"] Q -->|Streams multiple items| F["Return Publisher<T> / Flux<T>"] F -.->|Subscriptions only| SUB["@DgsSubscription"] style CF fill:#4a9eff,stroke:#2171c7,color:#fff style M fill:#00bfa5,stroke:#00897b,color:#fff style F fill:#7c4dff,stroke:#5e35b1,color:#fff style SUB fill:#7c4dff,stroke:#5e35b1,color:#fff
Why CompletableFuture for Data Fetchers
Most DGS data fetchers should return CompletableFuture<T>:
@DgsQuery
public CompletableFuture<Product> product(@InputArgument String id) {
return productService.findById(id).toFuture();
}The reason is DataLoader compatibility. The DataLoader API is built around CompletionStage (the supertype of CompletableFuture). When a field resolver calls dataLoader.load(key), it gets a CompletionStage. If your resolver returns Mono but internally uses DataLoaders, you end up converting between types:
// DataLoader returns CompletionStage — wrapping in Mono is unnecessary overhead
DataLoader<String, Product> loader = dfe.getDataLoader("products");
return Mono.fromCompletionStage(loader.load(productId)); // Mono just to unwrap a CompletionStageCompletableFuture as the return type keeps the async chain consistent.
When Mono<T> Makes Sense
Return Mono<T> directly when:
- You’re using reactive repositories or WebClient calls
- No DataLoaders are involved
- You want to compose multiple reactive operations
@DgsQuery
@PreAuthorize("isAuthenticated()")
public Mono<ProductPage> myProducts(
@InputArgument Integer pageNumber,
@InputArgument Integer pageSize) {
return currentUser.getUserId().flatMap(userId -> {
return productService.findByOwner(userId, pageNumber, pageSize);
});
}DGS handles both return types transparently — you don’t need adapters or configuration.
Subscriptions Always Use Publisher
Subscriptions must return Publisher<T> (the Reactive Streams interface). In practice, this means Flux<T>:
@DgsSubscription
public Publisher<ProgressUpdate> importProgress(@InputArgument String jobId) {
return Flux.from(progressPublisher.subscribe(jobId))
.map(this::mapToGraphQL);
}Mono<T> would emit at most one item and complete — not useful for a subscription.
Observability: Measuring What Matters
You can’t optimize what you can’t measure. An AOP-based metrics aspect gives you visibility into every GraphQL operation without modifying business code.
Operation Metrics
Wrap @DgsQuery and @DgsMutation methods with timing and counting:
@Aspect
@Component
@RequiredArgsConstructor
public class GraphQLMetrics {
private final MeterRegistry meterRegistry;
private static final Duration SLOW_THRESHOLD = Duration.ofSeconds(2);
@Around("@annotation(com.netflix.graphql.dgs.DgsQuery) || " +
"@annotation(com.netflix.graphql.dgs.DgsMutation)")
public Object measureOperation(ProceedingJoinPoint joinPoint) throws Throwable {
String operationName = joinPoint.getSignature().getName();
String operationType = isQuery(joinPoint) ? "query" : "mutation";
long startTime = System.nanoTime();
Object result = joinPoint.proceed();
// Handle reactive return types
if (result instanceof Mono<?> mono) {
return mono
.doOnSuccess(v -> recordSuccess(operationType, operationName, startTime))
.doOnError(e -> recordError(operationType, operationName, e, startTime));
}
// Handle synchronous results
recordSuccess(operationType, operationName, startTime);
return result;
}
private void recordSuccess(String type, String name, long startTime) {
Duration duration = Duration.ofNanos(System.nanoTime() - startTime);
meterRegistry.timer("graphql.operation.duration",
"operation_type", type,
"operation_name", name,
"status", "success"
).record(duration);
meterRegistry.counter("graphql.operation.processed",
"operation_type", type,
"operation_name", name
).increment();
if (duration.compareTo(SLOW_THRESHOLD) > 0) {
log.warn("Slow GraphQL operation",
kv("operation", name),
kv("type", type),
kv("durationMs", duration.toMillis()));
}
}
}This gives you three metrics per operation:
graphql.operation.duration— histogram of execution times, tagged by operation name and typegraphql.operation.processed— counter of successful operationsgraphql.operation.errors— counter of failed operations, tagged by error type
Why AOP Over DGS Instrumentation
DGS supports graphql-java’s Instrumentation interface for metrics. However, AOP has advantages:
- Cleaner separation — metrics code doesn’t mix with GraphQL execution internals.
- Access to Spring context — AOP can inspect method signatures, annotations, and dependency context.
- Reactive awareness — the aspect can detect
Mono/Fluxreturns and attach callbacks without blocking.
The trade-off is that AOP operates at the Spring method level, not the GraphQL field level. If you need per-field metrics (e.g., which specific field in a query is slow), use Instrumentation. For operation-level observability, AOP is simpler and sufficient.
Structured Logging for GraphQL
Every GraphQL operation should produce a structured log entry that’s queryable in your log aggregator:
log.info("GraphQL operation completed",
kv("operation", "products.search"),
kv("type", "query"),
kv("durationMs", 42),
kv("resultCount", 20),
kv("searchText", "electronics"));This produces JSON in Cloud Logging / ELK / Datadog that you can filter:
{
"message": "GraphQL operation completed",
"operation": "products.search",
"type": "query",
"durationMs": 42,
"resultCount": 20,
"searchText": "electronics"
}The key principle: structured key-value pairs, not string interpolation. log.info("Search took {}ms for query '{}'", duration, searchText) produces an unstructured string that’s nearly impossible to filter in production log aggregators.
What’s Next
In Part 5, we wrap up the series with the concerns that emerge as your GraphQL API grows: testing strategies, schema evolution, and federation — stitching multiple services into a single unified graph.


