Real-time a reaktivní vzory s Netflix DGS

Čtvrtý díl ze sedmi v sérii “Production GraphQL with Netflix DGS”
REST API fungují na principu request-response: klient se zeptá, server odpoví, spojení se zavře. GraphQL subscriptions tento vzor narušují — server posílá aktualizace klientovi v okamžiku, kdy nastanou. Tento článek popisuje, jak DGS implementuje subscriptions, jak fungují reaktivní typy napříč frameworkem a jak to celé monitorovat v produkci.
GraphQL Subscriptions
Subscriptions umožňují klientům přijímat proud aktualizací přes persistentní spojení. Typické případy použití: živé výsledkové tabule, sledování průběhu, notifikace, kolaborativní editace.
Schéma
Subscriptions se definují společně s queries a mutations:
type Subscription {
orderStatusUpdates(orderId: ID!): OrderStatusUpdate
inventoryAlerts(warehouseId: ID!): InventoryAlert
}
type OrderStatusUpdate {
orderId: ID!
status: OrderStatus!
timestamp: DateTime!
message: String
}Implementace na serveru
DGS subscription vrací Publisher<T> — typicky Reactor Flux:
@DgsComponent
@RequiredArgsConstructor
public class OrderSubscriptionController {
private final OrderEventPublisher orderEventPublisher;
private final CurrentUserProvider currentUser;
@DgsSubscription
public Publisher<OrderStatusUpdate> orderStatusUpdates(
@InputArgument String orderId) {
return Flux.from(orderEventPublisher.subscribe(orderId))
.map(event -> OrderStatusUpdate.newBuilder()
.orderId(event.orderId())
.status(mapStatus(event.status()))
.timestamp(event.timestamp())
.message(event.message())
.build());
}
}Průběh komunikace:
Transport: Funguje to samo
Od DGS 10.0 je subscription transport kompletně řešen pomocí Spring for GraphQL. Nepotřebujete vlastní WebSocket handlery — závislost graphql-dgs-spring-graphql-starter zahrnuje nativní podporu protokolu graphql-transport-ws.
Jediná infrastruktura, kterou potřebujete, je bean WebSocketService pro WebFlux aplikace:
@Configuration
public class WebSocketConfig {
@Bean
public WebSocketService webSocketService() {
return new HandshakeWebSocketService(
new ReactorNettyRequestUpgradeStrategy());
}
@Bean
public WebSocketHandlerAdapter webSocketHandlerAdapter(WebSocketService service) {
return new WebSocketHandlerAdapter(service);
}
}Subscriptions se obsluhují na stejném endpointu /graphql jako queries a mutations — upgrade protokolu se řeší automaticky.
Starší verze DGS vyžadovaly vlastní WebSocket handlery a dedikovaný endpoint /subscriptions. Pokud migrujete z verze před 10.0, můžete je odstranit — budou ve skutečnosti kolidovat s nativní implementací.
Autentizace v subscriptions
Subscriptions běží v jiném execution kontextu než queries. Počáteční HTTP request nese autentizační hlavičky, ale následné zprávy přes WebSocket už nemusí. Řešte to v okamžiku nastavení subscription:
@DgsSubscription
public Publisher<ScoreUpdate> liveScores(@InputArgument String boardId) {
// Get authentication during the initial subscription request
return currentUser.getUserId()
.flatMapMany(userId -> {
log.info("Subscription started",
kv("boardId", boardId), kv("userId", userId));
return eventPublisher.subscribe(boardId)
.map(this::mapToGraphQL)
.doOnComplete(() -> log.info("Subscription completed",
kv("boardId", boardId), kv("userId", userId)));
});
}Pro veřejné subscriptions (například živá sportovní tabule nebo veřejný feed událostí) je autentizace volitelná:
@DgsSubscription
public Publisher<ScoreUpdate> publicScores(@InputArgument String boardId) {
// No @PreAuthorize — this is intentionally public
return eventPublisher.subscribe(boardId)
.map(this::mapToGraphQL);
}Strategie reaktivních typů
DGS data fetchery potřebují vracet asynchronní výsledky. Dvě možnosti: CompletableFuture<T> a Mono<T>. Obě fungují a každá má své kompromisy.
data fetcher do?"} -->|Uses DataLoaders| CF["Return CompletableFuture<T>"] Q -->|Reactive repos, no DataLoaders| M["Return Mono<T>"] Q -->|Streams multiple items| F["Return Publisher<T> / Flux<T>"] F -.->|Subscriptions only| SUB["@DgsSubscription"] style CF fill:#4a9eff,stroke:#2171c7,color:#fff style M fill:#00bfa5,stroke:#00897b,color:#fff style F fill:#7c4dff,stroke:#5e35b1,color:#fff style SUB fill:#7c4dff,stroke:#5e35b1,color:#fff
Proč CompletableFuture pro data fetchery
Většina DGS data fetcherů by měla vracet CompletableFuture<T>:
@DgsQuery
public CompletableFuture<Product> product(@InputArgument String id) {
return productService.findById(id).toFuture();
}Důvodem je kompatibilita s DataLoaderem. DataLoader API je postavené na CompletionStage (nadtyp CompletableFuture). Když field resolver zavolá dataLoader.load(key), dostane CompletionStage. Pokud váš resolver vrací Mono, ale interně používá DataLoadery, skončíte s konverzemi mezi typy:
// DataLoader returns CompletionStage — wrapping in Mono is unnecessary overhead
DataLoader<String, Product> loader = dfe.getDataLoader("products");
return Mono.fromCompletionStage(loader.load(productId)); // Mono just to unwrap a CompletionStageCompletableFuture jako návratový typ udržuje asynchronní řetězec konzistentní.
Kdy dává smysl Mono<T>
Vracejte Mono<T> přímo, když:
- Používáte reaktivní repozitáře nebo WebClient volání
- Nepoužíváte DataLoadery
- Chcete skládat více reaktivních operací
@DgsQuery
@PreAuthorize("isAuthenticated()")
public Mono<ProductPage> myProducts(
@InputArgument Integer pageNumber,
@InputArgument Integer pageSize) {
return currentUser.getUserId().flatMap(userId -> {
return productService.findByOwner(userId, pageNumber, pageSize);
});
}DGS oba návratové typy zpracovává transparentně — nepotřebujete žádné adaptéry ani konfiguraci.
Subscriptions vždy používají Publisher
Subscriptions musí vracet Publisher<T> (rozhraní Reactive Streams). V praxi to znamená Flux<T>:
@DgsSubscription
public Publisher<ProgressUpdate> importProgress(@InputArgument String jobId) {
return Flux.from(progressPublisher.subscribe(jobId))
.map(this::mapToGraphQL);
}Mono<T> by emitoval maximálně jednu položku a skončil — pro subscription to není užitečné.
Observabilita: Měřte to, na čem záleží
Nemůžete optimalizovat to, co nemůžete měřit. AOP-based metrics aspect vám dá přehled o každé GraphQL operaci bez úpravy business kódu.
Metriky operací
Obalte metody @DgsQuery a @DgsMutation měřením času a počítáním:
@Aspect
@Component
@RequiredArgsConstructor
public class GraphQLMetrics {
private final MeterRegistry meterRegistry;
private static final Duration SLOW_THRESHOLD = Duration.ofSeconds(2);
@Around("@annotation(com.netflix.graphql.dgs.DgsQuery) || " +
"@annotation(com.netflix.graphql.dgs.DgsMutation)")
public Object measureOperation(ProceedingJoinPoint joinPoint) throws Throwable {
String operationName = joinPoint.getSignature().getName();
String operationType = isQuery(joinPoint) ? "query" : "mutation";
long startTime = System.nanoTime();
Object result = joinPoint.proceed();
// Handle reactive return types
if (result instanceof Mono<?> mono) {
return mono
.doOnSuccess(v -> recordSuccess(operationType, operationName, startTime))
.doOnError(e -> recordError(operationType, operationName, e, startTime));
}
// Handle synchronous results
recordSuccess(operationType, operationName, startTime);
return result;
}
private void recordSuccess(String type, String name, long startTime) {
Duration duration = Duration.ofNanos(System.nanoTime() - startTime);
meterRegistry.timer("graphql.operation.duration",
"operation_type", type,
"operation_name", name,
"status", "success"
).record(duration);
meterRegistry.counter("graphql.operation.processed",
"operation_type", type,
"operation_name", name
).increment();
if (duration.compareTo(SLOW_THRESHOLD) > 0) {
log.warn("Slow GraphQL operation",
kv("operation", name),
kv("type", type),
kv("durationMs", duration.toMillis()));
}
}
}Získáte tři metriky na operaci:
graphql.operation.duration— histogram doby provádění, tagovaný podle jména a typu operacegraphql.operation.processed— čítač úspěšných operacígraphql.operation.errors— čítač neúspěšných operací, tagovaný podle typu chyby
Proč AOP místo DGS Instrumentation
DGS podporuje rozhraní Instrumentation z graphql-java pro metriky. AOP však má své výhody:
- Čistější oddělení — kód metrik se nemíchá s interními detaily GraphQL execution.
- Přístup ke Spring kontextu — AOP může inspektovat signatury metod, anotace a kontext závislostí.
- Reaktivní povědomí — aspect dokáže detekovat
Mono/Fluxnávratové typy a připojit callbacky bez blokování.
Kompromis je, že AOP operuje na úrovni Spring metod, ne na úrovni GraphQL polí. Pokud potřebujete metriky na úrovni jednotlivých polí (např. které konkrétní pole v query je pomalé), použijte Instrumentation. Pro observabilitu na úrovni operací je AOP jednodušší a dostačující.
Strukturované logování pro GraphQL
Každá GraphQL operace by měla vyprodukovat strukturovaný log záznam, který lze dotazovat ve vašem log agregátoru:
log.info("GraphQL operation completed",
kv("operation", "products.search"),
kv("type", "query"),
kv("durationMs", 42),
kv("resultCount", 20),
kv("searchText", "electronics"));Výsledkem je JSON v Cloud Logging / ELK / Datadog, který můžete filtrovat:
{
"message": "GraphQL operation completed",
"operation": "products.search",
"type": "query",
"durationMs": 42,
"resultCount": 20,
"searchText": "electronics"
}Klíčový princip: strukturované key-value páry, ne interpolace řetězců. log.info("Search took {}ms for query '{}'", duration, searchText) produkuje nestrukturovaný řetězec, který je v produkčních log agregátorech téměř nemožné filtrovat.
Co bude dál
V pátém díle sérii uzavřeme tématy, která se objeví s růstem vaší GraphQL API: testovací strategie, evoluce schématu a federation — propojení více služeb do jednoho unifikovaného graphu.


