Products Consulting About Blog Contact Us Česky
arrow_back Back to blog

Real-time and Reactive Patterns with Netflix DGS

Real-time and Reactive Patterns with Netflix DGS

Part 4 of 7 in the “Production GraphQL with Netflix DGS” series


REST APIs are request-response: the client asks, the server answers, the connection closes. GraphQL subscriptions break that pattern — the server pushes updates to the client as they happen. This article covers how DGS implements subscriptions, how reactive types work throughout the framework, and how to observe it all in production.

GraphQL Subscriptions

Subscriptions let clients receive a stream of updates over a persistent connection. Common use cases: live scoreboards, progress tracking, notifications, collaborative editing.

Schema

Subscriptions are defined alongside queries and mutations:

GRAPHQL
type Subscription {
    orderStatusUpdates(orderId: ID!): OrderStatusUpdate
    inventoryAlerts(warehouseId: ID!): InventoryAlert
}

type OrderStatusUpdate {
    orderId: ID!
    status: OrderStatus!
    timestamp: DateTime!
    message: String
}

Server Implementation

A DGS subscription returns a Publisher<T> — typically a Reactor Flux:

Java
@DgsComponent
@RequiredArgsConstructor
public class OrderSubscriptionController {

    private final OrderEventPublisher orderEventPublisher;
    private final CurrentUserProvider currentUser;

    @DgsSubscription
    public Publisher<OrderStatusUpdate> orderStatusUpdates(
            @InputArgument String orderId) {

        return Flux.from(orderEventPublisher.subscribe(orderId))
                .map(event -> OrderStatusUpdate.newBuilder()
                        .orderId(event.orderId())
                        .status(mapStatus(event.status()))
                        .timestamp(event.timestamp())
                        .message(event.message())
                        .build());
    }
}

The flow:

sequenceDiagram participant Client participant WS as WebSocket participant DGS participant Sub as DgsSubscription participant Pub as EventPublisher Client->>WS: Subscribe via graphql-transport-ws WS->>DGS: Subscription request DGS->>Sub: orderStatusUpdates(orderId) Sub->>Pub: subscribe(orderId) Pub-->>Sub: Flux of Events loop Each event Pub-->>Sub: Event emitted Sub-->>DGS: OrderStatusUpdate DGS-->>WS: Serialized JSON WS-->>Client: Push update end Note over Client,Pub: Ends on Flux complete or client disconnect

Transport: It Just Works

Since DGS 10.0, subscription transport is handled entirely by Spring for GraphQL. You don’t need custom WebSocket handlers — the graphql-dgs-spring-graphql-starter dependency includes native support for the graphql-transport-ws protocol.

The only infrastructure you need is a WebSocketService bean for WebFlux applications:

Java
@Configuration
public class WebSocketConfig {

    @Bean
    public WebSocketService webSocketService() {
        return new HandshakeWebSocketService(
                new ReactorNettyRequestUpgradeStrategy());
    }

    @Bean
    public WebSocketHandlerAdapter webSocketHandlerAdapter(WebSocketService service) {
        return new WebSocketHandlerAdapter(service);
    }
}

Subscriptions are served at the same /graphql endpoint as queries and mutations — the protocol upgrade is handled automatically.

Earlier DGS versions required custom WebSocket handlers and a dedicated /subscriptions endpoint. If you’re migrating from pre-10.0, you can remove those — they’ll actually interfere with the native implementation.

Authentication in Subscriptions

Subscriptions run in a different execution context than queries. The initial HTTP request carries auth headers, but subsequent messages over the WebSocket may not. Handle this at subscription setup time:

Java
@DgsSubscription
public Publisher<ScoreUpdate> liveScores(@InputArgument String boardId) {
    // Get authentication during the initial subscription request
    return currentUser.getUserId()
            .flatMapMany(userId -> {
                log.info("Subscription started",
                        kv("boardId", boardId), kv("userId", userId));

                return eventPublisher.subscribe(boardId)
                        .map(this::mapToGraphQL)
                        .doOnComplete(() -> log.info("Subscription completed",
                                kv("boardId", boardId), kv("userId", userId)));
            });
}

For public subscriptions (like a live sports scoreboard or public event feed), authentication is optional:

Java
@DgsSubscription
public Publisher<ScoreUpdate> publicScores(@InputArgument String boardId) {
    // No @PreAuthorize — this is intentionally public
    return eventPublisher.subscribe(boardId)
            .map(this::mapToGraphQL);
}

The Reactive Type Strategy

DGS data fetchers need to return asynchronous results. Two options: CompletableFuture<T> and Mono<T>. Both work, and each has trade-offs.

flowchart TD Q{"What does your
data fetcher do?"} -->|Uses DataLoaders| CF["Return CompletableFuture<T>"] Q -->|Reactive repos, no DataLoaders| M["Return Mono<T>"] Q -->|Streams multiple items| F["Return Publisher<T> / Flux<T>"] F -.->|Subscriptions only| SUB["@DgsSubscription"] style CF fill:#4a9eff,stroke:#2171c7,color:#fff style M fill:#00bfa5,stroke:#00897b,color:#fff style F fill:#7c4dff,stroke:#5e35b1,color:#fff style SUB fill:#7c4dff,stroke:#5e35b1,color:#fff

Why CompletableFuture for Data Fetchers

Most DGS data fetchers should return CompletableFuture<T>:

Java
@DgsQuery
public CompletableFuture<Product> product(@InputArgument String id) {
    return productService.findById(id).toFuture();
}

The reason is DataLoader compatibility. The DataLoader API is built around CompletionStage (the supertype of CompletableFuture). When a field resolver calls dataLoader.load(key), it gets a CompletionStage. If your resolver returns Mono but internally uses DataLoaders, you end up converting between types:

Java
// DataLoader returns CompletionStage — wrapping in Mono is unnecessary overhead
DataLoader<String, Product> loader = dfe.getDataLoader("products");
return Mono.fromCompletionStage(loader.load(productId));  // Mono just to unwrap a CompletionStage

CompletableFuture as the return type keeps the async chain consistent.

When Mono<T> Makes Sense

Return Mono<T> directly when:

  • You’re using reactive repositories or WebClient calls
  • No DataLoaders are involved
  • You want to compose multiple reactive operations
Java
@DgsQuery
@PreAuthorize("isAuthenticated()")
public Mono<ProductPage> myProducts(
        @InputArgument Integer pageNumber,
        @InputArgument Integer pageSize) {

    return currentUser.getUserId().flatMap(userId -> {
        return productService.findByOwner(userId, pageNumber, pageSize);
    });
}

DGS handles both return types transparently — you don’t need adapters or configuration.

Subscriptions Always Use Publisher

Subscriptions must return Publisher<T> (the Reactive Streams interface). In practice, this means Flux<T>:

Java
@DgsSubscription
public Publisher<ProgressUpdate> importProgress(@InputArgument String jobId) {
    return Flux.from(progressPublisher.subscribe(jobId))
            .map(this::mapToGraphQL);
}

Mono<T> would emit at most one item and complete — not useful for a subscription.

Observability: Measuring What Matters

You can’t optimize what you can’t measure. An AOP-based metrics aspect gives you visibility into every GraphQL operation without modifying business code.

Operation Metrics

Wrap @DgsQuery and @DgsMutation methods with timing and counting:

Java
@Aspect
@Component
@RequiredArgsConstructor
public class GraphQLMetrics {

    private final MeterRegistry meterRegistry;
    private static final Duration SLOW_THRESHOLD = Duration.ofSeconds(2);

    @Around("@annotation(com.netflix.graphql.dgs.DgsQuery) || " +
            "@annotation(com.netflix.graphql.dgs.DgsMutation)")
    public Object measureOperation(ProceedingJoinPoint joinPoint) throws Throwable {
        String operationName = joinPoint.getSignature().getName();
        String operationType = isQuery(joinPoint) ? "query" : "mutation";
        long startTime = System.nanoTime();

        Object result = joinPoint.proceed();

        // Handle reactive return types
        if (result instanceof Mono<?> mono) {
            return mono
                    .doOnSuccess(v -> recordSuccess(operationType, operationName, startTime))
                    .doOnError(e -> recordError(operationType, operationName, e, startTime));
        }

        // Handle synchronous results
        recordSuccess(operationType, operationName, startTime);
        return result;
    }

    private void recordSuccess(String type, String name, long startTime) {
        Duration duration = Duration.ofNanos(System.nanoTime() - startTime);

        meterRegistry.timer("graphql.operation.duration",
                "operation_type", type,
                "operation_name", name,
                "status", "success"
        ).record(duration);

        meterRegistry.counter("graphql.operation.processed",
                "operation_type", type,
                "operation_name", name
        ).increment();

        if (duration.compareTo(SLOW_THRESHOLD) > 0) {
            log.warn("Slow GraphQL operation",
                    kv("operation", name),
                    kv("type", type),
                    kv("durationMs", duration.toMillis()));
        }
    }
}

This gives you three metrics per operation:

  • graphql.operation.duration — histogram of execution times, tagged by operation name and type
  • graphql.operation.processed — counter of successful operations
  • graphql.operation.errors — counter of failed operations, tagged by error type

Why AOP Over DGS Instrumentation

DGS supports graphql-java’s Instrumentation interface for metrics. However, AOP has advantages:

  • Cleaner separation — metrics code doesn’t mix with GraphQL execution internals.
  • Access to Spring context — AOP can inspect method signatures, annotations, and dependency context.
  • Reactive awareness — the aspect can detect Mono/Flux returns and attach callbacks without blocking.

The trade-off is that AOP operates at the Spring method level, not the GraphQL field level. If you need per-field metrics (e.g., which specific field in a query is slow), use Instrumentation. For operation-level observability, AOP is simpler and sufficient.

Structured Logging for GraphQL

Every GraphQL operation should produce a structured log entry that’s queryable in your log aggregator:

Java
log.info("GraphQL operation completed",
        kv("operation", "products.search"),
        kv("type", "query"),
        kv("durationMs", 42),
        kv("resultCount", 20),
        kv("searchText", "electronics"));

This produces JSON in Cloud Logging / ELK / Datadog that you can filter:

JSON
{
    "message": "GraphQL operation completed",
    "operation": "products.search",
    "type": "query",
    "durationMs": 42,
    "resultCount": 20,
    "searchText": "electronics"
}

The key principle: structured key-value pairs, not string interpolation. log.info("Search took {}ms for query '{}'", duration, searchText) produces an unstructured string that’s nearly impossible to filter in production log aggregators.

What’s Next

In Part 5, we wrap up the series with the concerns that emerge as your GraphQL API grows: testing strategies, schema evolution, and federation — stitching multiple services into a single unified graph.


Cover photo by Sigmund on Unsplash.

More from the Blog