Catalog notification
Catalog notification
Catalog notifications let you know when a catalog is updated.
Enable Notifications
To enable notification, the catalog must have notifications enabled in its configuration.
Subscribe to Notifications
To subscribe to a notification stream, you need to call subscribeToNotifications from QueryApi.
// create queryApi for target catalog
val queryApi = DataClient().queryApi(catalogHrn)
// create subscription configuration
val consumerSettings =
NotificationConsumerSettings(groupName = "consumer-group-name")
// subscription to notifications
val control: Future[NotificationSubscriptionControl] =
queryApi.subscribeToNotifications(
consumerSettings, { notification =>
// this callback is called each time a new batch publication happens in catalog
println(s"catalog ${catalogHrn} has a new version ${notification.version}")
}
)// create queryApi for target catalog
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
// create subscription configuration
NotificationConsumerSettings consumerSettings =
new NotificationConsumerSettings.Builder().withGroupName("consumer-group-name").build();
// subscription to notifications
CompletionStage<NotificationSubscriptionControl> controlStage =
queryApi.subscribeToNotifications(
consumerSettings,
notification ->
// this callback is called each time a new batch publication happens in catalog
System.out.printf(
"catalog %s has a new version %d\n",
catalogHrn, notification.getCatalogVersion()));For better control, you can subscribe to a Pekko stream source.
// create queryApi for target catalog
val queryApi = DataClient().queryApi(catalogHrn)
// create subscription configuration
val consumerSettings =
NotificationConsumerSettings(groupName = "consumer-group-name")
// subscription to notifications
val control: Future[NotificationSubscriptionControl] =
queryApi
.subscribeToNotifications(consumerSettings)
.map { subscription =>
// consume the notification streams
subscription.notifications
.runWith(Sink.foreach { notification: BatchPublicationNotification =>
// this callback is called each time a new batch publication happens in catalog
println(s"catalog ${catalogHrn} has a new version ${notification.version}")
})
subscription.subscriptionControl
}// create queryApi for target catalog
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
// create subscription configuration
NotificationConsumerSettings consumerSettings =
new NotificationConsumerSettings.Builder().withGroupName("consumer-group-name").build();
// subscription to notifications
CompletionStage<NotificationSubscriptionControl> controlStage =
queryApi
.subscribeToNotifications(consumerSettings)
.thenApply(
subscription -> {
subscription
.notifications()
.runWith(
Sink.foreach(
notification ->
// this callback is called each time a new batch publication
// happens in catalog
System.out.printf(
"catalog %s has a new version %d\n",
catalogHrn, notification.getCatalogVersion())),
myMaterializer);
return subscription.subscriptionControl();
});To cancel the subscription, call NotificationSubscriptionControl.shutdown.
control.flatMap(_.shutdown())controlStage.thenCompose(control -> control.shutdown());Updated 22 days ago