Produkty Poradenství O nás Blog Kontakt English
arrow_back Zpět na blog

Real-time a reaktivní vzory s Netflix DGS

Real-time a reaktivní vzory s Netflix DGS

Čtvrtý díl ze sedmi v sérii “Production GraphQL with Netflix DGS”


REST API fungují na principu request-response: klient se zeptá, server odpoví, spojení se zavře. GraphQL subscriptions tento vzor narušují — server posílá aktualizace klientovi v okamžiku, kdy nastanou. Tento článek popisuje, jak DGS implementuje subscriptions, jak fungují reaktivní typy napříč frameworkem a jak to celé monitorovat v produkci.

GraphQL Subscriptions

Subscriptions umožňují klientům přijímat proud aktualizací přes persistentní spojení. Typické případy použití: živé výsledkové tabule, sledování průběhu, notifikace, kolaborativní editace.

Schéma

Subscriptions se definují společně s queries a mutations:

GRAPHQL
type Subscription {
    orderStatusUpdates(orderId: ID!): OrderStatusUpdate
    inventoryAlerts(warehouseId: ID!): InventoryAlert
}

type OrderStatusUpdate {
    orderId: ID!
    status: OrderStatus!
    timestamp: DateTime!
    message: String
}

Implementace na serveru

DGS subscription vrací Publisher<T> — typicky Reactor Flux:

Java
@DgsComponent
@RequiredArgsConstructor
public class OrderSubscriptionController {

    private final OrderEventPublisher orderEventPublisher;
    private final CurrentUserProvider currentUser;

    @DgsSubscription
    public Publisher<OrderStatusUpdate> orderStatusUpdates(
            @InputArgument String orderId) {

        return Flux.from(orderEventPublisher.subscribe(orderId))
                .map(event -> OrderStatusUpdate.newBuilder()
                        .orderId(event.orderId())
                        .status(mapStatus(event.status()))
                        .timestamp(event.timestamp())
                        .message(event.message())
                        .build());
    }
}

Průběh komunikace:

sequenceDiagram participant Client participant WS as WebSocket participant DGS participant Sub as DgsSubscription participant Pub as EventPublisher Client->>WS: Subscribe via graphql-transport-ws WS->>DGS: Subscription request DGS->>Sub: orderStatusUpdates(orderId) Sub->>Pub: subscribe(orderId) Pub-->>Sub: Flux of Events loop Each event Pub-->>Sub: Event emitted Sub-->>DGS: OrderStatusUpdate DGS-->>WS: Serialized JSON WS-->>Client: Push update end Note over Client,Pub: Ends on Flux complete or client disconnect

Transport: Funguje to samo

Od DGS 10.0 je subscription transport kompletně řešen pomocí Spring for GraphQL. Nepotřebujete vlastní WebSocket handlery — závislost graphql-dgs-spring-graphql-starter zahrnuje nativní podporu protokolu graphql-transport-ws.

Jediná infrastruktura, kterou potřebujete, je bean WebSocketService pro WebFlux aplikace:

Java
@Configuration
public class WebSocketConfig {

    @Bean
    public WebSocketService webSocketService() {
        return new HandshakeWebSocketService(
                new ReactorNettyRequestUpgradeStrategy());
    }

    @Bean
    public WebSocketHandlerAdapter webSocketHandlerAdapter(WebSocketService service) {
        return new WebSocketHandlerAdapter(service);
    }
}

Subscriptions se obsluhují na stejném endpointu /graphql jako queries a mutations — upgrade protokolu se řeší automaticky.

Starší verze DGS vyžadovaly vlastní WebSocket handlery a dedikovaný endpoint /subscriptions. Pokud migrujete z verze před 10.0, můžete je odstranit — budou ve skutečnosti kolidovat s nativní implementací.

Autentizace v subscriptions

Subscriptions běží v jiném execution kontextu než queries. Počáteční HTTP request nese autentizační hlavičky, ale následné zprávy přes WebSocket už nemusí. Řešte to v okamžiku nastavení subscription:

Java
@DgsSubscription
public Publisher<ScoreUpdate> liveScores(@InputArgument String boardId) {
    // Get authentication during the initial subscription request
    return currentUser.getUserId()
            .flatMapMany(userId -> {
                log.info("Subscription started",
                        kv("boardId", boardId), kv("userId", userId));

                return eventPublisher.subscribe(boardId)
                        .map(this::mapToGraphQL)
                        .doOnComplete(() -> log.info("Subscription completed",
                                kv("boardId", boardId), kv("userId", userId)));
            });
}

Pro veřejné subscriptions (například živá sportovní tabule nebo veřejný feed událostí) je autentizace volitelná:

Java
@DgsSubscription
public Publisher<ScoreUpdate> publicScores(@InputArgument String boardId) {
    // No @PreAuthorize — this is intentionally public
    return eventPublisher.subscribe(boardId)
            .map(this::mapToGraphQL);
}

Strategie reaktivních typů

DGS data fetchery potřebují vracet asynchronní výsledky. Dvě možnosti: CompletableFuture<T> a Mono<T>. Obě fungují a každá má své kompromisy.

flowchart TD Q{"What does your
data fetcher do?"} -->|Uses DataLoaders| CF["Return CompletableFuture<T>"] Q -->|Reactive repos, no DataLoaders| M["Return Mono<T>"] Q -->|Streams multiple items| F["Return Publisher<T> / Flux<T>"] F -.->|Subscriptions only| SUB["@DgsSubscription"] style CF fill:#4a9eff,stroke:#2171c7,color:#fff style M fill:#00bfa5,stroke:#00897b,color:#fff style F fill:#7c4dff,stroke:#5e35b1,color:#fff style SUB fill:#7c4dff,stroke:#5e35b1,color:#fff

Proč CompletableFuture pro data fetchery

Většina DGS data fetcherů by měla vracet CompletableFuture<T>:

Java
@DgsQuery
public CompletableFuture<Product> product(@InputArgument String id) {
    return productService.findById(id).toFuture();
}

Důvodem je kompatibilita s DataLoaderem. DataLoader API je postavené na CompletionStage (nadtyp CompletableFuture). Když field resolver zavolá dataLoader.load(key), dostane CompletionStage. Pokud váš resolver vrací Mono, ale interně používá DataLoadery, skončíte s konverzemi mezi typy:

Java
// DataLoader returns CompletionStage — wrapping in Mono is unnecessary overhead
DataLoader<String, Product> loader = dfe.getDataLoader("products");
return Mono.fromCompletionStage(loader.load(productId));  // Mono just to unwrap a CompletionStage

CompletableFuture jako návratový typ udržuje asynchronní řetězec konzistentní.

Kdy dává smysl Mono<T>

Vracejte Mono<T> přímo, když:

  • Používáte reaktivní repozitáře nebo WebClient volání
  • Nepoužíváte DataLoadery
  • Chcete skládat více reaktivních operací
Java
@DgsQuery
@PreAuthorize("isAuthenticated()")
public Mono<ProductPage> myProducts(
        @InputArgument Integer pageNumber,
        @InputArgument Integer pageSize) {

    return currentUser.getUserId().flatMap(userId -> {
        return productService.findByOwner(userId, pageNumber, pageSize);
    });
}

DGS oba návratové typy zpracovává transparentně — nepotřebujete žádné adaptéry ani konfiguraci.

Subscriptions vždy používají Publisher

Subscriptions musí vracet Publisher<T> (rozhraní Reactive Streams). V praxi to znamená Flux<T>:

Java
@DgsSubscription
public Publisher<ProgressUpdate> importProgress(@InputArgument String jobId) {
    return Flux.from(progressPublisher.subscribe(jobId))
            .map(this::mapToGraphQL);
}

Mono<T> by emitoval maximálně jednu položku a skončil — pro subscription to není užitečné.

Observabilita: Měřte to, na čem záleží

Nemůžete optimalizovat to, co nemůžete měřit. AOP-based metrics aspect vám dá přehled o každé GraphQL operaci bez úpravy business kódu.

Metriky operací

Obalte metody @DgsQuery a @DgsMutation měřením času a počítáním:

Java
@Aspect
@Component
@RequiredArgsConstructor
public class GraphQLMetrics {

    private final MeterRegistry meterRegistry;
    private static final Duration SLOW_THRESHOLD = Duration.ofSeconds(2);

    @Around("@annotation(com.netflix.graphql.dgs.DgsQuery) || " +
            "@annotation(com.netflix.graphql.dgs.DgsMutation)")
    public Object measureOperation(ProceedingJoinPoint joinPoint) throws Throwable {
        String operationName = joinPoint.getSignature().getName();
        String operationType = isQuery(joinPoint) ? "query" : "mutation";
        long startTime = System.nanoTime();

        Object result = joinPoint.proceed();

        // Handle reactive return types
        if (result instanceof Mono<?> mono) {
            return mono
                    .doOnSuccess(v -> recordSuccess(operationType, operationName, startTime))
                    .doOnError(e -> recordError(operationType, operationName, e, startTime));
        }

        // Handle synchronous results
        recordSuccess(operationType, operationName, startTime);
        return result;
    }

    private void recordSuccess(String type, String name, long startTime) {
        Duration duration = Duration.ofNanos(System.nanoTime() - startTime);

        meterRegistry.timer("graphql.operation.duration",
                "operation_type", type,
                "operation_name", name,
                "status", "success"
        ).record(duration);

        meterRegistry.counter("graphql.operation.processed",
                "operation_type", type,
                "operation_name", name
        ).increment();

        if (duration.compareTo(SLOW_THRESHOLD) > 0) {
            log.warn("Slow GraphQL operation",
                    kv("operation", name),
                    kv("type", type),
                    kv("durationMs", duration.toMillis()));
        }
    }
}

Získáte tři metriky na operaci:

  • graphql.operation.duration — histogram doby provádění, tagovaný podle jména a typu operace
  • graphql.operation.processed — čítač úspěšných operací
  • graphql.operation.errors — čítač neúspěšných operací, tagovaný podle typu chyby

Proč AOP místo DGS Instrumentation

DGS podporuje rozhraní Instrumentation z graphql-java pro metriky. AOP však má své výhody:

  • Čistější oddělení — kód metrik se nemíchá s interními detaily GraphQL execution.
  • Přístup ke Spring kontextu — AOP může inspektovat signatury metod, anotace a kontext závislostí.
  • Reaktivní povědomí — aspect dokáže detekovat Mono/Flux návratové typy a připojit callbacky bez blokování.

Kompromis je, že AOP operuje na úrovni Spring metod, ne na úrovni GraphQL polí. Pokud potřebujete metriky na úrovni jednotlivých polí (např. které konkrétní pole v query je pomalé), použijte Instrumentation. Pro observabilitu na úrovni operací je AOP jednodušší a dostačující.

Strukturované logování pro GraphQL

Každá GraphQL operace by měla vyprodukovat strukturovaný log záznam, který lze dotazovat ve vašem log agregátoru:

Java
log.info("GraphQL operation completed",
        kv("operation", "products.search"),
        kv("type", "query"),
        kv("durationMs", 42),
        kv("resultCount", 20),
        kv("searchText", "electronics"));

Výsledkem je JSON v Cloud Logging / ELK / Datadog, který můžete filtrovat:

JSON
{
    "message": "GraphQL operation completed",
    "operation": "products.search",
    "type": "query",
    "durationMs": 42,
    "resultCount": 20,
    "searchText": "electronics"
}

Klíčový princip: strukturované key-value páry, ne interpolace řetězců. log.info("Search took {}ms for query '{}'", duration, searchText) produkuje nestrukturovaný řetězec, který je v produkčních log agregátorech téměř nemožné filtrovat.

Co bude dál

V pátém díle sérii uzavřeme tématy, která se objeví s růstem vaší GraphQL API: testovací strategie, evoluce schématu a federation — propojení více služeb do jednoho unifikovaného graphu.


Úvodní foto od Sigmund na Unsplash.

Další z blogu