How to compose an incremental processing pipeline with DeltaSets
How to compose an incremental processing pipeline with DeltaSets
Note
DeltaSetsis a new feature of Data Processing Library and the API might change in future versions.DeltaSetscan only be used in Scala, not in Java projects.
DeltaSets are a new distributed processing abstraction provided by Data Processing Library. Similar
to Spark RDDs, DeltaSets provide a functional interface for transforming data in a cluster, with
transformations such as mapReduce and filterByKey. Their main difference from RDDs is that
DeltaSet transformations can be computed incrementally if required.
DeltaSets allow you to build custom Compilation Patterns, which means that you can
have compilers with as many resolveFn, compileInFn, and compileOutFn functions as required by
your particular application.
Design
The main processing abstraction is a DeltaSet[K, V], where K is a type of key and V is a type
of value. The DeltaSet represents a collection of key-value pairs that is stored and transformed in
a Spark cluster. A key can be associated with one value only.
-
K— is oftencom.here.platform.data.processing.catalog.Partition.Key, which is a key identifying a partition inside the platform catalog. However,Kcan be any type that is Serializable and that has an implicit Ordering defined. Examples of these types include strings, integers, and tuples. -
V— is oftencom.here.platform.data.processing.catalog.Partition.Metathat identifies data in the platform catalog, or acom.here.platform.data.processing.blobstore.Payloadwith the actual data stored in the catalog. However,Vcan be any type, even an integer or a string.
For example, reading the contents of the platform catalog layer result in a DeltaSet[Key, Meta].
A DeltaSet is always immutable, but transformations can be applied to it resulting in a
transformed DeltaSet. For example, the transformation mapValues({x => x + 1}) can be used to
transform a DeltaSet[Key, Int] into a new DeltaSet[Key, Int] in which all
values are incremented by one.
Once a DeltaSet is transformed into DeltaSet[Key, Payload] to contain the desired
payloads for the output catalog, you can publish it. This results in a
PublishedSet, which you can then commit as a new version of the output catalog.
The transformations are always lazy, which means that they are only performed when you commit the output catalog. In other words, a DeltaSet is not evaluated until it is committed to the platform catalog.
Example: copy a layer
This example shows how to implement a pipeline that copies a layer from one catalog to another catalog.
The simplest way to use DeltaSets is to extend the DeltaSimpleSetup in the application's Main
object. To add support for the standard configuration files and command line options for pipelines,
in this example you also extend the PipelineRunner trait (see Set up and Run the Driver), giving our Main object the
following skeleton:
import com.here.platform.data.processing.catalog.{Catalog, Layer}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._
import com.here.platform.data.processing.driver.runner.pipeline.PipelineRunner
object Main extends PipelineRunner with DeltaSimpleSetup {
val applicationVersion: String = "1.0"
def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {
???
}
}DeltaSimpleSetup requires the Main object to implement the setupSets method, which defines the
processing logic of the pipeline. Processing logic defined using DeltaSets can be structured in many
different ways, typically in four phases:
- Query the
KeyandMetapairs from one or more input catalogs resulting in aDeltaSet[Key, Meta]. - Retrieve the payloads corresponding to the metadata resulting in a
DeltaSet[Key, Payload]. - Transform the data stored in the payloads and rewrite the keys to store the target catalog and the target layer.
- Publish the transformed payloads, resulting in a
PublishedSetthat is then committed to the output catalog.
In this example, you copy the payloads without modifying them, instead of transforming them in Step 3.
- Query: To query the
KeyandMetapairs from one or more input catalogs, thesetupSetsmethod provides as an argument aDeltaContext, which provides, among others, access to the input catalog.
import com.here.platform.data.processing.catalog.Partition._
val keyMetas: DeltaSet[Key, Meta] =
context.queryCatalogLayer(Catalog.Id("inCatalogA"), Layer.Id("inLayerA"))- Retrieve: To retrieve the
Payloads corresponding to the metadata, import DeltaSet transformations from the context, and get an instance of theRetrieverobject for the corresponding catalog. Then,keyMetascan be transformed.
import com.here.platform.data.processing.blobstore.Payload
import context.transformations._ // This import enables transformations on DeltaSets
val retriever = context.inRetriever(Catalog.Id("inCatalogA"))
val keyPayloads: DeltaSet[Key, Payload] =
keyMetas.mapValuesWithKey((key, meta) => retriever.getPayload(key, meta))- Process: To rewrite the keys to store the target catalog and the target layer, use
the
mapKeysoperation.
val rewrittenKeys: DeltaSet[Key, Payload] =
keyPayloads.mapKeys(
OneToOne(
_.copy(catalog = Default.OutCatalogId, layer = Layer.Id("outLayerA")),
_.copy(catalog = Catalog.Id("inCatalogA"), layer = Layer.Id("inLayerA"))
),
PreservesPartitioning
)- Publish: To publish the transformed payloads, a
DeltaSet[Key, Payload]provides apublishoperation, which takes a set of layers to publish to as arguments.
val result: PublishedSet = rewrittenKeys.publish(Set(Layer.Id("inLayerA")))
Iterable(result)The complete example is shown below:
import com.here.platform.data.processing.catalog.{Catalog, Layer, Partition}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._
import com.here.platform.data.processing.driver.runner.pipeline.PipelineRunner
object Main extends PipelineRunner with DeltaSimpleSetup {
val applicationVersion: String = "1.0"
def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {
import context.transformations._
val retriever = context.inRetriever(Catalog.Id("inCatalogA"))
Iterable(
context
.queryCatalogLayer(Catalog.Id("inCatalogA"), Layer.Id("inLayerA"))
.mapValuesWithKey((key, meta) => retriever.getPayload(key, meta))
.mapKeys(
OneToOne[Partition.Key, Partition.Key](
_.copy(catalog = Default.OutCatalogId, layer = Layer.Id("outLayerA")),
_.copy(catalog = Catalog.Id("inCatalogA"), layer = Layer.Id("inLayerA"))
),
PreservesPartitioning
)
.publish(Set(Layer.Id("outLayerA")))
)
}
}Transformations
This section explains the transformations currently available on DeltaSets.
Publish payloads
Every DeltaSet must eventually be turned into a set of payloads which contain the data to be
published in the output catalog. The publish operation is available on any DeltaSet of type
DeltaSet[Key, Payload] and this operation uploads all payloads to the Data API. The result of
the publish operation is a PublishedSet that can only be returned from setupSets, but cannot
be transformed any further. This is described in the example below.
val payloads: DeltaSet[Key, Payload] = ???
val published: PublishedSet = payloads.publish(Set(Layer.Id("outLayer")))You cannot publish to the same layer multiple times. If you need to individually publish parts of
an output layer, use publishPart instead.
(Advanced) Multi-part publish
Use publishPart instead of publish to upload disjoint parts of an output layer (or a set of
output layers) and if you need to read back each part individually. Each output key
must be deterministically assigned to a single part, through a PublishedPartMapper.
In the example below, a PartMapperByLayer is used to map keys to different publish parts based on
their zoom level. Lower zoom level partitions are first published, and then read back to build
aggregated partitions at a higher zoom level. The unionPublishedParts operation is finally used to
combine all parts together into the final PublishedSet:
val intermediate: DeltaSet[Key, Payload] = ???
val partMapper = PartMapperByLevel(Set(12, 11))
val firstPart: PublishedPart =
intermediate.publishPart(Set(Layer.Id("multi-level-layer")),
partMapper,
partMapper.partForLevel(12))
val secondPart: PublishedPart = firstPart
.readBack()
.mapGroup({
case (key, meta) => (key.copy(partition = key.partition.ancestors.head), (key, meta))
}, context.defaultPartitioner)
.mapValues { partitions: Iterable[(Key, Meta)] =>
val payload: Payload = Aggregator.aggregate(partitions)
payload
}
.publishPart(Set(Layer.Id("multi-level-layer")), partMapper, partMapper.partForLevel(11))
context.unionPublishedParts(Seq(firstPart, secondPart))Transform values
Use mapValues and mapValuesWithKey to transform the values inside of a DeltaSet. Both these
operations do not modify the keys in the DeltaSet, hence they do not need to shuffle data between
worker nodes in the cluster when they are run. Consequently, these operations are very efficient.
In the following example, the values in a DeltaSet[Key, Int] are incremented by one using
mapValues.
val integers: DeltaSet[Key, Int] = ???
val incrementedIntegers: DeltaSet[Key, Int] = integers.mapValues(_ + 1)mapValuesWithKey is a similar operation, but it also provides the key to the transformation
function. See the copy-a-layer example for details on using this operation.
Transform keys and values
To transform both keys and values simultaneously, or to transform the key in a key-value pair based
on its value, use one of these transformations: mapUnique, mapGroup, or mapReduce. However, if
performance is a concern, then consider using either mapValues, mapValuesWithKey or one of the
mapKeys* operations.
mapUnique transforms a key-value pair into a new key-value pair, as long as no duplicate keys are
produced. If duplicate keys are produced, the transformation will fail at run-time. Since data is
shuffled between nodes in the cluster, a partitioner must be explicitly provided as an argument.
For example, in the following snippet, keys and values are split in two layers:
- Positive values: set the layer to
positive_values. - Negative values: set the layer to
negative_values.
val deltaSet1: DeltaSet[Key, Int] = ???
val split: DeltaSet[Key, Int] =
deltaSet1.mapUnique(
mapFn = { (key, i) =>
if (i >= 0) {
(key.copy(layer = Layer.Id("positive_values")), i)
} else {
(key.copy(layer = Layer.Id("negative_values")), i)
}
},
partitioning = context.defaultPartitioner
)mapGroup transforms a key-value pair into a new key-value pair. The values of duplicate keys are
grouped, such that the resulting DeltaSet assigns each key a collection of values.
For example, in the following snippet, metadata associated with HERE tile partitions are mapped to and grouped by their parent HERE tile.
val deltaSet1: DeltaSet[Key, Meta] = ???
val deltaSet2: DeltaSet[Key, Iterable[Meta]] =
deltaSet1.mapGroup(
mapFn = {
case (key, value) =>
(key.copy(partition = key.partition.parent.getOrElse(key.partition)), value)
},
partitioning = context.defaultPartitioner
)mapReduce transforms a key-value pair into a new version that reduces the values of duplicate
keys, using the reduce function that you provide. This reduce function must combine two values into
one. Using mapReduce is more efficient than using mapGroup and then reducing each value using
mapValues.
For example, in the following snippet, all integers associated with HERE tile partitions are mapped to their parent HERE tile and the value of each output HERE tile is reduced to its sum.
val deltaSet1: DeltaSet[Key, Int] = ???
val deltaSet2: DeltaSet[Key, Int] =
deltaSet1.mapReduce(
mapFn = {
case (key, value) =>
(key.copy(partition = key.partition.parent.getOrElse(key.partition)), value)
},
reduceFn = _ + _,
partitioning = context.defaultPartitioner
)These mapUnique, mapGroup, and mapReduce operations take a key-value mapping function to
produce exactly one key. In contrast, flatMapUnique, flatMapGroup, and flatMapReduce
operations take a mapping function to produce zero or more keys.
Transform keys
DeltaSets provide a set of transformations for modifying the keys in a DeltaSet without taking into account values. These transformations are very efficient and should be preferred over transformations based on keys and values if possible.
For example, mapKeys is an efficient operation to transform just the keys in a DeltaSet without
reading or writing the values. The key transformation must be 1-to-1, that is, every key in the
input DeltaSet is mapped to a unique key in the output DeltaSet. To ensure that the transformation
is 1-to-1 and to allow efficient incremental processing, mapKeys requires you to specify, both the
key mapping function mapFn, and the inverse of that function, inverseFn:
val partitioner = NameHashPartitioner(10)
val input: DeltaSet[Key, Int] = ???
val incrementedIntegers: DeltaSet[Key, Int] =
input.mapKeys(
OneToOne(
mapFn = key => key.copy(layer = Layer.Id("outLayerA")),
inverseFn = key => key.copy(layer = Layer.Id("inLayerA"))
),
partitioner
)See the copy-a-layer example for more details on how to use this operation.
If the DeltaSet contains a key x, for which inverseFn(mapFn(x)) != x, the transformation will
fail at run-time. The inverse function can be called on keys that are not produced by mapFn, so it
must return a correct result for any key supplied to it. inverseFn can be defined as a partial
function if it is defined only on a subset of the possible keys.
val partitioner = HashPartitioner[String](context.defaultParallelism)
val input: DeltaSet[Int, String] = ???
val stringKeyed: DeltaSet[String, String] =
input.mapKeys(
OneToOne[Int, String](
mapFn = _.toString,
inverseFn = {
case s: String if s forall Character.isDigit => s.toInt
}
),
partitioner
)If it is inconvenient or impossible to specify the inverse transformation,
consider using the more expensive mapUnique transformation explained below.
flatMapKeys is a transformation which maps each input key to zero or more output keys (1-to-many).
Similar to mapKeys an inverse function must be passed to show that each output key is the result
of exactly one input key.
When mapping multiple input keys to the same output key (1-to-1 or 1-to-n), the set of
values can either be grouped or reduced, just like in
key and value transformations. DeltaSets provide four transformations to cover
all combinations of grouping/reducing and n-to-1/m-to-n: mapKeysGroup and
flatMapKeysGroup group all values in a collection, whereas mapKeysReduce and flatMapKeysReduce
apply a reduce function to all values. See DirectMToNCompiler migration for an
example of how to use flatMapKeysGroup.
Filter data
filterByKey filters key-value pairs from a DeltaSet based only on their keys. This transformation
runs very efficiently and does not require any data exchange between nodes in the Spark cluster.
In the following snippet, administrative_places layer from the hmc catalog is queried and all
partition keys with a generic partition
name
that starts with "1469256839" are filtered.
This corresponds to reading all administrative
places in
Australia if the hmc catalog ID points to the HERE Map Content catalog.
import com.here.platform.data.processing.catalog.Partition.Generic
val filteredInput: DeltaSet[Key, Meta] =
context
.queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("administrative_places"))
.filterByKey {
case Key(catalog, layer, Generic(name)) =>
name.startsWith("1469256839")
case _ => false
}For convenience, you can also use Partition Key Filters in a
filterByKey operation. For example, to filter only those partitions that have a HERE
tile
as a partition name with the HERE tile belonging to a bounding box around Berlin, use the following
filterByKey operation.
val filteredInput: DeltaSet[Key, Meta] =
context
.queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("road_attributes"))
.filterByKey(
BoundingBoxFilter(
south = 50.97656,
west = 11.95313,
north = 51.06445,
east = 12.04102
)
)Partition Key Filters can also be defined from
the configuration file, in the path here.platform.data-processing.deltasets.partitionKeyFilters.
The partition key filters defined in the configuration file will apply to all query
transformations and readBack.
Join data
join is a DeltaSet transformation that takes two DeltaSets and produces a DeltaSet that
contains, for each key contained in both DeltaSets, the pair of values associated with the key in
each of the DeltaSets.
val integers: DeltaSet[Key, Int] = ???
val strings: DeltaSet[Key, String] = ???
val pairs: DeltaSet[Key, (Int, String)] = integers join stringsOther kinds of join transformations provided by DeltaSets are:
outerJointakes two DeltaSets and produces a DeltaSet that contains, for each key contained in either DeltaSet, the pair of values associated with the key in each of the DeltaSets. If a key is not associated value in one of the DeltaSets, the entry in the pair is set toNone.leftOuterJointakes two DeltaSets and produces a DeltaSet that contains, for each key contained in the left DeltaSet, the pair of values associated with the key in each of the DeltaSets. If a key is not associated value in the right DeltaSet, the entry in the pair is set toNone.
The following example shows a common use-case of joins: two layers, road-attributes and
topology-geometry are queried from catalog hmc (HERE Map Content). The keys in both resulting
DeltaSets are rewritten to contain the same catalog and layer of the output catalog. Then, the
outerJoin is computed, resulting in a DeltaSet that contains the metadata for both layers. In
this way, the contents of the partitions can be correlated.
val topology: DeltaSet[Key, Meta] =
context
.queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("topology_geometry"))
.mapKeys(
OneToOne(
_.copy(catalog = Default.OutCatalogId, layer = Layer.Id("outLayerA")),
_.copy(catalog = Catalog.Id("hmc"), layer = Layer.Id("topology_geometry"))
),
context.defaultPartitioner
)
val roadAttributes: DeltaSet[Key, Meta] =
context
.queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("road_attributes"))
.mapKeys(
OneToOne(
_.copy(catalog = Default.OutCatalogId, layer = Layer.Id("outLayerA")),
_.copy(catalog = Catalog.Id("hmc"), layer = Layer.Id("road_attributes"))
),
context.defaultPartitioner
)
val pairs: DeltaSet[Key, (Option[Meta], Option[Meta])] =
topology outerJoin roadAttributesNote that join is a stateful transformation, while outerJoin and leftOuterJoin
are not stateful, which can make the latter two more efficient.
Union data
disjointUnion is a DeltaSet transformation that takes two input DeltaSets and produces an output
DeltaSet that contains every key-value pair contained in either of the input DeltaSets. The
operation throws an exception if there is a key that is contained in both input DeltaSets.
In the following snippet, you split an input DeltaSet into two DeltaSets with tiles at zoom level 10
and 12, respectively. Then, each DeltaSet is transformed separately using mapValues into strings
and the union of the results are stored in the variable combined.
val input: DeltaSet[Key, Meta] = ???
val tilesAt10: DeltaSet[Key, String] =
input
.filterByKey {
case Key(_, _, t: HereTile) => t.quad.getZoomLevel == 10
case _ => false
}
.mapValues(???)
val tilesAt12: DeltaSet[Key, String] =
input
.filterByKey {
case Key(_, _, t: HereTile) => t.quad.getZoomLevel == 12
case _ => false
}
.mapValues(???)
val combined: DeltaSet[Key, String] =
tilesAt10 disjointUnion tilesAt12In the snippet above, both input DeltaSets have the exact same type. They can also have different
value types, as in the following snippet. In this case, the value type of the output DeltaSet is a
common supertype of value types of the inputs. In the following snippet, the union of
DeltaSet[Key, Int] and DeltaSet[Key, String] is typed as DeltaSet[Key, Any]. This works
because, in Scala, Any is a supertype of both String and Int.
val integers: DeltaSet[Key, Int] = ???
val strings: DeltaSet[Key, String] = ???
val union: DeltaSet[Key, Any] = integers disjointUnion stringsThe context allows the construction of a disjoint union of not only two, but two or more
DeltaSets:
val integers1: DeltaSet[Key, Int] = ???
val integers2: DeltaSet[Key, Int] = ???
val integers3: DeltaSet[Key, Int] = ???
val union: DeltaSet[Key, Int] =
context.disjointUnion(List(integers1, integers2, integers3))Dynamically resolve references
mapValuesWithResolver can be used to transform a DeltaSet of key-meta pairs, the subjects, and
dynamically access other partitions, the references, during the transformation. It is an
alternative to static reference resolution using resolveReferences or a
RefTreeCompiler, which requires pre-computing all
required references up-front. In contrast, dynamic reference resolution is more flexible, requires
fewer lines of code and can be faster than static reference resolution, especially for complex
reference structures.
mapValuesWithResolver is similar to mapValuesWithKey, however, the mapping function that is
applied to each subject gets three arguments: the key and metadata of the subject, as well as a
Resolver, that determines the metadata for any key that may be referenced by the subject. Using
the metadata of the subject and the references, you can retrieve the
corresponding payloads.
The Resolver uses one or more ResolutionStrategys to find the metadata that corresponds to a
key. One such strategy is DirectQuery, which directly requests the metadata via the
Data API - which is simple
but requires one network query for each metadata resolved. In the next section,
you will see three other resolution strategies that are more efficient by downloading large sets of
metadata at once.
In the following snippet, you resolve references from partitions in layer A of catalog
inA to partitions in layer B of catalog inA. Each partition in layer A references the name
of a partition in layer B. Typically, for example when processing HERE Map Content, the
partition's name is stored with other data. However, for the purpose of this example, we
assume that there's no other information in partitions in layer A.
First, query the layer A into subjects DeltaSet. Then, call mapValuesWithResolver on it,
passing a mapping function and a strategies parameter, which is set to DirectQuery.
Inside the mapping function:
- We retrieve the partition in layer
Ausing theretriever. - We convert the content of the partition to a string,
referenceName, and construct aKeyobject referencing thereferenceNamepartition in layerB. - We use the resolver to retrieve the metadata corresponding to the reference.
- If the referenced partition does not exist, we throw an exception.
- Otherwise, we retrieve that partition.
val retriever = context.inRetriever(Catalog.Id("inA"))
val subjects: DeltaSet[Key, Meta] =
context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("A"))
subjects.mapValuesWithResolver(
mapFn = {
case (resolver, key, meta) =>
// Retrieve and decode the payload. String construction is a placeholder for
// decoding the partition and getting a reference.
val referenceName = new String(retriever.getPayload(key, meta).content)
// Construct a key for the partition with "referenceName" in layer B.
val referenceKey = Key(Catalog.Id("inA"), Layer.Id("B"), Generic(referenceName))
// Try to find the metadata for `referenceKey`.
resolver.resolve(referenceKey) match {
case None => throw new Exception("Partition does not exist!")
case Some(referenceMeta) =>
// Retrieve the referenced partition.
val referencePartition = retriever.getPayload(referenceKey, referenceMeta)
??? // TODO: Do something with the partition
}
},
strategies = List(DirectQuery(Catalog.Id("inA"), Set(Layer.Id("B"))))
)Resolution strategies
Four resolution strategies are currently available:
-
DirectQuery-- Given a catalog, and a set of layers, this strategy directly retrieves the metadata individually for each key via the Data API. The result of the request is cached per executor. The size of the cache can be configured by passing an argument to the constructor ofDirectQuery; the default is 10000 metadata objects (around 3MB). -
Broadcast-- Given a DeltaSet containing metadata, this strategy sends a complete copy of the metadata to each Spark executor, making sure that the whole metadata is available on each executor without further network requests. Internally, this strategy uses a Spark broadcast variable. Depending on the amount of metadata, this may require a large amount of memory in each e xecutor. The memory required for storing the metadata is roughly 300 bytes per partition in the DeltaSet. -
BackwardResolution-- Suppose you have a DeltaSet containing references and a function that maps each reference to a set of subject partitions.BackwardResolutionexposes these references to the resolver when processing the subject partition, without any further network queries. For example, in the following snippet each partition of layerAis grouped with all of its children in layerB.
BackwardResolution(
context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("B")), { (key, meta) =>
Set(key.copy(layer = Layer.Id("A"), partition = key.partition.parent.get))
}
)The predefined backward resolution strategy BackwardResolution.toSamePartition groups each
subject partition with the reference partition of the same name. Similarly,
BackwardResolution.toNeighbors groups each subject tile with all its neighbor tiles at a
given depth.
ForwardResolution-- Suppose you have a DeltaSet containing references and a function that maps each subject to a set of reference partitions.ForwardResolutionexposes these references to the resolver when processing the subject partition, without any further network queries. This strategy is the inverse ofBackwardResolution.ForwardResolutiontakes the key and value types of the subject DeltaSet as type parameters. The following snippet shows how to group each subject tile with all neighbor tiles.
ForwardResolution[Key, Meta](
context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("B")), {
case (key @ Key(_, _, tile: HereTile), meta) =>
tile
.neighbors(1)
.map(tile => key.copy(layer = Layer.Id("B"), partition = tile))
case _ => ???
}
)mapValuesWithResolver takes a list of resolutions strategies, allowing them to be combined
sequentially. Consider, for example, the resolution strategy list in the following snippet. When
processing partitions of layer A and resolving references to tiles in layer B, all direct
neighbors of the currently processed tile will be available without network request. To resolve
other tiles in layer B (either farther away or at a different zoom level), a network request will
be performed. To resolve references to layer C, we will always use network requests, while
references in layer D will be resolved using a broadcast.
val strategies =
List(
BackwardResolution.toNeighbors(
context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("B")),
Catalog.Id("inA"),
Layer.Id("A")
),
DirectQuery(Catalog.Id("inA"), Set(Layer.Id("B"), Layer.Id("C"))),
Broadcast(context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("D")))
)mapValuesWithResolver can be applied to DeltaSet[Key, Meta], in which case the partitioner of
the subject DeltaSet will be used also to partition references. If mapValuesWithResolver is
applied to a DeltaSet with a different key or value type, a partitioner for the references must
be provided. For example, in this snippet, a DeltaSet[HereTile, String] is transformed.
val deltaSet: DeltaSet[HereTile, String] = ???
deltaSet.mapValuesWithResolver(
(r, k, s) => ???,
Seq(
ForwardResolution(
context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("B")), {
case (tile, string) =>
tile
.neighbors(1)
.map(tile => Key(Catalog.Id("inA"), Layer.Id("B"), tile))
}
)),
HashPartitioner(12)
)Statically resolve references
resolveReferences is a transformation that implements the same functionality as the first step of
a RefTreeCompiler: given a reference relation between
catalog partitions, the transformation groups each partition in a DeltaSet with its references. The
reference relation is defined by:
- a RefTree, which specifies the types of references that may exist between partitions
- a resolve function, which computes the concrete set of partition keys referenced by a partition For more information, see RefTreeCompiler.
In the following snippet, resolveReferences is used to group each key-meta pair in deltaSet1,
whose partition name is a HERE
tile,
is grouped with all key-meta pairs of all of its neighbors.
import com.here.platform.data.processing.compiler.reftree._
val deltaSet1: DeltaSet[Key, Meta] = ???
val deltaSet2: DeltaSet[Key, (Meta, Map[Key, Meta])] =
deltaSet1.resolveReferences(
RefTree(
Subject((Catalog.Id("inCatalogA"), Layer.Id("inLayerA")),
Ref(RefTree.RefName("neighbor"),
(Catalog.Id("inCatalogA"), Layer.Id("inLayerA"))))),
resolveFn = {
case (Key(catalog, layer, partition: HereTile), meta) =>
val neighbor = partition.neighbors(radius = 1) - partition
Map(RefTree.RefName("neighbor") -> neighbor.map(neighborTile =>
Key(catalog, layer, neighborTile)))
case _ => Map.empty
}
)
Note
resolveReferencesbehaves differently than the reference resolution step in aRefTreeCompilerin one detail: If Partition Key Filters are defined in the configuration file, they apply to both references and subjects. InRefTreeCompiler, only the subjects are filtered.
Read back published data
For a PublishedSet, readBack is the only transformation that you can apply. It turns the
PublishedSet, which is the result of publishing partitions to a layer, into a DeltaSet containing
the key and metadata of all partitions contained in that layer after the publishing. This way,
partitions that were published in an earlier processing step can be read back and used in the
following steps.
In the code snippet below, we are using readBack to read an intermediate result after it has
been published to an output catalog layer. We then combine the result of readBack using
disjointUnion with a layer from the input catalog hmc (HERE Map Content) and
resolve references between these layers.
val intermediate: DeltaSet[Key, Payload] = ???
val intermediatePublished: PublishedSet = intermediate.publish(Set(Layer.Id("intermediate")))
val intermediateAndTopology: DeltaSet[Key, Meta] =
intermediatePublished.readBack() disjointUnion
context.queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("topology_geometry"))
val references: DeltaSet[Key, (Meta, Map[Key, Meta])] =
intermediateAndTopology.resolveReferences(
RefTree(
Subject((Default.OutCatalogId, Layer.Id("intermediate")),
Ref(RefTree.RefName("intermediate_to_topology"),
(Catalog.Id("hmc"), Layer.Id("topology_geometry"))))),
resolveFn = ???
)Convert RDDs to DeltaSets
toDeltaSet is an operation that can be used to convert a Spark RDD into a DeltaSet. This can be
used, for example, for ingesting data from other sources using Spark and integrating it into a
processing pipeline that uses DeltaSets. The RDD must contain key-value pairs and it may not contain
more than one pair with the same key. A partitioner to repartition the RDD must be passed to
toDeltaSet. Unless the RDD is already partitioned with the given partitioner, the repartitioning
of the RDD causes a shuffle.
The resulting DeltaSet does not contain any information about changes since the last run of the pipeline, even in an incremental run of the pipeline, downstream DeltaSet transformation will process all data in the DeltaSet.
In the following snippet, we show to ingest a CSV file via Spark RDDs into a DeltaSet. We are using
the SparkContext, which is accessible via the DeltaContext, to read a CSV file airports.csv.
Then, we convert the RDD into a key-value form, where the first column of the CSV file serves as a
key. Finally, we use toDeltaSet to convert the RDD to a DeltaSet.
val sc = context.driverContext.spark
val rowsByFirstColumn: RDD[(Key, Array[String])] =
sc.textFile("airports.csv").map { x =>
val columns = x.split(",").map(_.trim)
(Key(Catalog.Id("outCatalog"), Layer.Id("outLayer"), Generic(columns(0))),
columns.drop(1))
}
val deltaSet: DeltaSet[Key, Array[String]] =
rowsByFirstColumn.toDeltaSet(context.defaultPartitioner)Spark partitions and shuffles
The data in a DeltaSet is always partitioned according to a specific
partitioner, that assigns each key in the DeltaSet to a Spark
partition, and thereby, to a node in the cluster where the data resides. The partitioner is always
preserved during DeltaSet transformations unless a new partitioner is explicitly specified in a
transformation. In particular, the repartition transformation does nothing but change the
partitioner and repartition the data according to the new partitioner.
All transformations that potentially transform the keys in a DeltaSet or change the partitioner require repartitioning the data, and may, therefore, move data between the nodes in the cluster, which is called shuffling data. Shuffling is an expensive operation and should be avoided. See the transformation property table to see which DeltaSet transformations may need to shuffle data.
Partitioning strategies
Each transformation that shuffles data requires you to explicitly provide a partitioning
strategy for the resulting keys. This partitioning strategy can either be a partitioner or the
PreservesPartitioning special value.
When you use a partitioner, the transformation uses that partitioner to repartition the result.
If there are no performance requirements for your transformation, you can use the
defaultPartitioner field in the DeltaContext.
When a transformation changes the keys in a DeltaSet but each key remains in the same Spark
partition, you can use PreservesPartitioning. This strategy prevents the transformation from
shuffling data altogether, resulting in significant performance improvements. If the transformation
cannot preserve the partitioning, an exception is thrown at runtime.
In the copy-a-layer example, we use mapKeys to change the catalog and layer of the
keys in a DeltaSet. It is partitioned with the defaultPartitioner, a PartitionNamePartitioner,
which groups all catalog partitions with the same name in the same Spark partition, irrespective of
the catalog and layer. Consequently, all catalog partitions remain within the same Spark partition
and we can use PreservesPartitioning.
The PreservesPartitioning partitioning strategy can even be used when the key types for the
upstream and downstream DeltaSets are not the same, as long as the upstream DeltaSet is
partitioned by a partitioner that is general enough to also handle the downstream key type. This
advanced feature allows, for example, to map a DeltaSet with Partition.Key keys to a DeltaSet with
Partition.Name keys without causing a Spark shuffle:
// Query data from a catalog and change the key type from Partition.Key to Partition.Name
val catalog: Catalog.Id = ???
val layer: Layer.Id = ???
context
.queryCatalogLayer(catalog, layer)
.mapKeys(
OneToOne[Key, Name](_.partition, Key(catalog, layer, _)),
PreservesPartitioning
)Here, PreservesPartitioning can be used because the default partitioner used by
queryCatalogLayer is a PartitionNamePartitioner that supports both the upstream key type,
Partition.Key, and the downstream key type, Partition.Name.
Laziness and persist data
Whenever a DeltaSet is used in two or more transformations, its result should be persisted in the
memory or on the disk of the Spark workers, to avoid recomputing the result twice or more. This is
explained in more detail for RDDs in RDD Persistence
policy and applies equally to DeltaSets. Use
the transformation persist to persist a DeltaSet for reuse, as in the following example:
import org.apache.spark.storage.StorageLevel
val deltaSet1: DeltaSet[Key, Int] = ???
val doubled = deltaSet1
.mapValues(_ * 2)
.persist(StorageLevel.MEMORY_AND_DISK_2)
val reuse1 =
doubled
.mapValues(x => Payload(BigInt(x).toByteArray))
.publish(Set(Layer.Id("outLayer1")))
val reuse2 =
doubled
.mapValues(x => Payload(BigInt(x / 2).toByteArray))
.publish(Set(Layer.Id("outLayer2")))Performance properties
The internal implementations of the different transformations have varying cost in time and space, which cannot directly be seen from the outside. In particular, there are two properties that make certain transformations be more expensive than others: shuffling and stateful transformations.
A transformation that shuffles data moves data between nodes in the cluster, which uses bandwidth and causes a slow-down of the computation.
A transformation that is stateful has to store auxiliary information in the state layer of the
output catalog to allow incremental computations. This means, that extra storage is consumed in the
output catalog, extra time is required to compute the state and additional RAM on the nodes of the
cluster are required to persist the state until the end of the computation.
Using stateful and/or shuffling operations is inevitable in most pipelines, however, the following table can help to avoid stateful and shuffling transformations wherever possible.
| Operation | Shuffles? | Stateful? |
|---|---|---|
detectChanges | no | yes |
disjointUnion | no | no |
filterByKey | no | no |
flatMapGroup | yes1 | yes |
flatMapKeys | yes1 | no |
flatMapKeysGroup | yes1 | no |
flatMapKeysReduce | yes1 | no |
flatMapReduce | yes1 | yes |
flatMapUnique | yes1 | yes |
join | no | yes |
leftOuterJoin | no | no |
mapGroup | yes1 | yes |
mapKeys | yes1 | no |
mapKeysGroup | yes1 | no |
mapKeysReduce | yes1 | no |
mapReduce | yes1 | yes |
mapUnique | yes1 | yes |
mapValues | no | no |
mapValuesWithKey | no | no |
mapValuesWithResolver | yes | yes |
outerJoin | no | no |
persist | no | no |
publish | no | no |
publishPart | no | no |
readBack | no | no |
repartition | yes | no |
resolveReferences | yes | yes |
toDeltaSet | yes | no |
To avoid the overhead incurred by stateful transformations, it is recommended to avoid them
wherever possible. For example, one can often use the stateless outerJoin instead of a stateful
join, or transform keys and values separately with the stateless mapKeysGroup and mapValues
transformations instead of using a stateful mapGroup.
The forceStateless configuration option can force any transformation
to be stateless. This will avoid the overhead incurred by the state, but the transformation will
not be able to track dependencies between partitions during an incremental run. In practice,
this means that in an incremental run, the DeltaSet has to process all upstream partitions,
whether they changed or not, unless it can determine that the upstream DeltaSet did not change
at all. In the latter case, no processing needs to be done, no matter the value of forceStateless.
A notable exception is detectChanges, which will still be able to process only the changed part
of the upstream DeltaSet even when forceStateless is set. detectChanges can however not reduce
the set of changed partitions without state, and will effectively be disabled by setting
forceStateless.
Comparison and migration
This section compares DeltaSets to the other ways of expressing distributed computation in the Data Processing Library and provides help to migrate to DeltaSets from less flexible interfaces.
Functional patterns
If you are a previous user of Functional Patterns, you can
understand DeltaSets as the way to build custom compilation patterns, which means that you can
have compilers with as many resolveFn, compileInFn, and compileOutFn functions as required by
your particular application. However, DeltaSets provide many more ways of structuring the
computation – for example, you can reuse the result of one compileInFn in several compileOutFns,
you can join the results of several compileInFn, you can publish an intermediate results to
the output catalog.
Migrate a MapGroupCompiler to DeltaSets
A MapGroupCompiler, expressed in DeltaSet transformations, corresponds to:
- applying a
compileInfunction to all key-meta pairs in theinLayersgrouping the result usingflatMapGroup - transforming the groups of intermediate data into payloads using the
compileOutfunction - publishing the result
The full example is shown below. TODO tags identify the places where you define your intermediate
data, input layers, and the MapGroupCompiler.
import com.here.platform.data.processing.blobstore.Retriever
import com.here.platform.data.processing.catalog._
import com.here.platform.data.processing.compiler.{CompileOut1To1Fn, MapGroupCompiler}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._
object MapGroupMain extends DeltaSimpleSetup {
case class IntermediateData() // TODO: Define the intermediate data of the compiler
def inLayers: Map[Catalog.Id, Set[Layer.Id]] =
??? // TODO: Define the input layers of the compiler
def constructMapGroupCompiler(retrievers: Map[Catalog.Id, Retriever])
: MapGroupCompiler[IntermediateData] with CompileOut1To1Fn[IntermediateData] =
??? // TODO: Construct the compiler
def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {
import context.transformations._
val retrievers = inLayers.map { case (c, _) => (c, context.inRetriever(c)) }
val compiler = constructMapGroupCompiler(retrievers)
// If the compiler defined a particular partitioner, use it. Otherwise, use same partitioner
// used for partitioning queried metadata (context.defaultPartitioner), a reasonable default.
val partitioner =
compiler.outPartitioner(context.defaultParallelism).getOrElse(context.defaultPartitioner)
val result =
context
.queryCatalogs(
inLayers
)
.flatMapGroup(
Function.untupled(compiler.compileInFn),
partitioner
)
.mapValuesWithKey(
compiler.compileOutFn
)
.publish(
compiler.outLayers
)
Iterable(result)
}
}Migrate a RefTreeCompiler to DeltaSets
A RefTreeCompiler, expressed in DeltaSet transformations, corresponds to:
- grouping a set of subject partitions with their references using
resolveReferences - applying a
compileInfunction to all subject-reference pairs, and grouping the result usingflatMapGroup - transforming the groups of intermediate data into payloads using the
compileOutfunction - publishing the result
The full example is shown below. TODO tags identify the places where you define your intermediate
data, input layers, and the RefTreeCompiler:
import com.here.platform.data.processing.blobstore.Retriever
import com.here.platform.data.processing.catalog._
import com.here.platform.data.processing.compiler.reftree.CompileInFnWithRefs
import com.here.platform.data.processing.compiler.{CompileOut1To1Fn, RefTreeCompiler}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._
object RefTreeMain extends DeltaSimpleSetup {
case class IntermediateData() // TODO: Define the intermediate data of the compiler
def inLayers: Map[Catalog.Id, Set[Layer.Id]] =
??? // TODO: Define the input layers of the compiler
def constructTestRefTreeCompiler(
retrievers: Map[Catalog.Id, Retriever]): RefTreeCompiler[IntermediateData]
with CompileInFnWithRefs[IntermediateData]
with CompileOut1To1Fn[IntermediateData] =
??? // TODO: Construct the compiler
def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {
import context.transformations._
val retrievers = inLayers.map { case (c, _) => (c, context.inRetriever(c)) }
val compiler = constructTestRefTreeCompiler(retrievers)
// If the compiler defined a particular partitioner, use it. Otherwise, use same partitioner
// used for partitioning queried metadata (context.defaultPartitioner), a reasonable default.
val partitioner =
compiler.outPartitioner(context.defaultParallelism).getOrElse(context.defaultPartitioner)
Iterable(
context
.queryCatalogs(
compiler.inLayers
)
.resolveReferences(
compiler.refStructure,
Function.untupled(compiler.resolveFn)
)
.flatMapGroup(
{ case (k, (v, refs)) => compiler.compileInFn((k, v), refs) },
partitioner
)
.mapValuesWithKey(
compiler.compileOutFn
)
.publish(
compiler.outLayers
)
)
}
}Migrate Direct 1:N and M:N Compilers to DeltaSets
A Direct M:N Compiler, expressed in DeltaSet transformations, corresponds to:
- computing the intermediate data for each input key-value pair using
compileInandmapValuesWithKey. - mapping all input key-values pairs to the corresponding output keys using
mappingFnandflatMapKeysGroup. Here, additionally the inverse ofmappingFnhas to be specified. - transforming the groups of intermediate data into payloads using the
compileOutfunction - publishing the result
The full example is shown below. TODO tags identify the places where you define your intermediate
data, input layers, the inverse of mappingFn and the DirectMToNCompiler:
import com.here.platform.data.processing.blobstore.Retriever
import com.here.platform.data.processing.catalog._
import com.here.platform.data.processing.compiler.direct.CompileInFn
import com.here.platform.data.processing.compiler.{CompileOut1To1Fn, DirectMToNCompiler}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._
object DirectMToNMain extends DeltaSimpleSetup {
case class IntermediateData() // TODO: Define the intermediate data of the compiler
def inverseMappingFn: Partition.Key => Iterable[Partition.Key] = ???
def inLayers: Map[Catalog.Id, Set[Layer.Id]] =
??? // TODO: Define the input layers of the compiler
def constructTestDirectMToNCompiler(retrievers: Map[Catalog.Id, Retriever]): DirectMToNCompiler[
IntermediateData] with CompileInFn[IntermediateData] with CompileOut1To1Fn[IntermediateData] =
??? // TODO: Construct the compiler
def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {
import context.transformations._
val retrievers = inLayers.map { case (c, _) => (c, context.inRetriever(c)) }
val compiler = constructTestDirectMToNCompiler(retrievers)
// If the compiler defined a particular partitioner, use it. Otherwise, use same partitioner
// used for partitioning queried metadata (context.defaultPartitioner), a reasonable default.
val partitioner =
compiler.outPartitioner(context.defaultParallelism).getOrElse(context.defaultPartitioner)
Iterable(
context
.queryCatalogs(
compiler.inLayers
)
.mapValuesWithKey {
case (k, v) =>
compiler.compileInFn((k, v))
}
.flatMapKeysGroup(
ManyToMany(
compiler.mappingFn,
inverseMappingFn
),
partitioner
)
.mapValuesWithKey(
compiler.compileOutFn
)
.publish(
compiler.outLayers
)
)
}
}A Direct1toNCompiler can be expressed using a similar sequence of transformations, replacing
flatMapKeysGroup by flatMapKeys.
Spark RDD-based patterns
Like an RDD, a DeltaSet represents data distributed over a cluster of machines, that can be
transformed using a set of functional operations like filterKeys or mapValues. In fact, a
DeltaSet internally uses an RDD to represent that data. However, there are three main differences
between using DeltaSets and using RDDs directly:
-
Key-Value: In contrast to an RDD, a DeltaSet contains only key-value pairs and never contains duplicate keys. That is why, for example, there is no simple
mapoperation available for DeltaSets, as it may create duplicate keys. -
Strongly partitioned: Furthermore, a DeltaSet is always partitioned according to a specific partitioner, that assigns each key-value pair one Spark partition, which in turn is stored on a specific machine in the cluster. An
RDD, on the other hand, can store data without a specific partitioner being defined for how the data is partitioned. -
Incremental: Every computation expressed with a DeltaSet can be executed incrementally without manually keeping track of dependencies as done in the DepCompiler and IncrementalDepCompiler.
Multi-compiler tasks
A multi-compiler task can chain the effect of several compilers by
uploading the result of one compiler to an catalog layer and downloading the content of that catalog
layer in another compiler. DeltaSets can also chain the effect of several compilers, however, you
can decide whether intermediate results should be published and read back from the output catalog,
as done in a multi-compiler task, or not. For more information on how to emulate a multi-compiler
task using DeltaSets see the readBack transformation.
IDs and configuration
Sometimes it is useful to change the behavior of a DeltaSet transformation from a configuration
file, for example, to tune the performance parameters of transformations without recompiling the
application. Each transformation on a DeltaSet has a unique ID, used to identify every
transformation in the configuration file. To configure a transformation from the configuration file,
assign an ID to it by calling the withId function, which sets the ID of the transformation
immediately preceding it.
val deltaSet1: DeltaSet[Key, Int] = ???
val doubled: DeltaSet[Key, Int] = deltaSet1.mapValues(_ * 2).withId("doublingMap")Transformations that you do not assign an ID in this way are automatically assigned an ID based on
the order in which they appear in the source code. To determine the ID, call the id method on the
DeltaSet that results from the transformation.
IDs are not only assigned to DeltaSets, but also to PublishedSets. Both share the functionality
for identifying and configuring the classes in the common super-class BaseSet.
For establishing modularity within the pipeline, it can be useful to wrap a group of BaseSets
into a common namespace. IDs then only need to be unique within this namespace, and the namespace
is included in logging messages and the named RDDs in Spark UI. Use BaseSet.Namespace.enter to
enter a new namespace. Namespaces can be nested.
val deltaSet1: DeltaSet[Key, Int] = ???
BaseSet.Namespace.enter("routingModule") {
val doubled: DeltaSet[Key, Int] = deltaSet1
.mapValues(_ * 2)
.withId("doublingMap")
}DeltaSets can be configured by adding a section to the pipeline's application.conf which is
described in Configuring the Library. You can configure both the defaults that
apply to all transformations and each transformation individually. The snippet below shows the
default configuration and describes each option that is defined in the
here.platform.data-processing.deltasets.default. To change the default configuration, copy this
snippet into your application.conf and change the values accordingly.
// Configures the default settings for DeltaSet operations.
here.platform.data-processing.deltasets.default {
// Where to store intermediate results that must be persisted. See
// https://spark.apache.org/docs/latest/rdd-programming-guide.html#which-storage-level-to-choose
// Applies to: flatMapReduce, flatMapGroup, mapReduce, mapGroup, mapValues, mapValuesWithKey,
// resolve, detectChanges, join.
intermediateStorageLevel = "MEMORY_AND_DISK"
// Defines how much extra processing should be done to detect invalid use of DeltaSets and
// improve debugging.
//
// Possible values:
//
// PERFORMANCE: Disables certain potentially expensive checks that help detecting incorrect
// uses of DeltaSets as early as possible. When this setting is used it can be harder to find
// root causes of a problem. In particular:
// - disjointUnion does not validate that the union does not create duplicate keys.
// - toDeltaSet does not validate that the given RDD does not contain duplicate keys.
//
// SAFETY: Enable validation checks. Default.
//
// DEBUG: Like "SAFETY", but also enable processing to improve debug output. In particular:
// - Logical errors such as IllegalArgumentException, are not thrown immediately during a
// stage that invokes user-defined functions. Instead, they are collected in a container in
// the driver, and thrown only once.
// Applies to: mapReduce/flatMapReduce (only mapFn parameter, not reduceFn),
// mapGroup/flatMapGroup, mapValues/mapValuesWithKey and resolve.
validationLevel = "SAFETY"
// Number of threads to use for running user-provided functions, for example mapFn and resolveFn.
// Increasing this number is useful when you have blocking I/O happening inside the function.
// Applies to: mapReduce/flatMapReduce (only mapFn parameter, not reduceFn),
// mapGroup/flatMapGroup, mapValues/mapValuesWithKey and resolve.
threads = 1
// Processes the keys within a partition in sorted order. Sorting ensures that keys with
// similar names are processed together. This can improve the performance if the map function
// caches resources, such as downloaded payloads, and the same cache entries are likely to be
// requested while processing keys that are nearby in the sorting order.
// Applies to: mapReduce/flatMapReduce (only mapFn parameter, not reduceFn),
// mapGroup/flatMapGroup and mapValues/mapValuesWithKey.
sorting = false
// Can be used to disable incremental computation of a DeltaSet. This will cause upstream
// and downstream DeltaSets to also be computed non-incrementally.
// Applies to: all transformations except publish
incremental = true
// Can be used to disable stateful computation of a DeltaSet. No state will be generated
// for the DeltaSet, and the DeltaSet cannot track changes during incremental runs, that is,
// it will either process all or none of the input partitions.
// Applies to: all stateful transformation
forceStateless = false
}You can configure a transformation with the ID id in the
here.platform.data-processing.deltasets.id path of the configuration. For example, the
following snippet sets the number of threads that are used by the doublingMap transformation.
here.platform.data-processing.deltasets.doublingMap {
threads = 3
}Configuration files adhere to the namespaces of the transformation. For example,
here.platform.data-processing.deltasets.namespace1 can be used to configure all transformations
created in the namespace namespace1. If a transformation with ID id1 is created inside
namespace1, then its configuration is constructed by applying the settings defined in
here.platform.data-processing.deltasets.default,
here.platform.data-processing.deltasets.namespace1
and here.platform.data-processing.deltasets.namespace1.id1 in this order.
The configuration read from the configuration file can also be overridden programmatically in the application code. Programmatic overrides have preference over all settings defined in configuration files.
val deltaSet1: DeltaSet[Key, Int] = ???
val doubled: DeltaSet[Key, Int] = deltaSet1
.mapValues(_ * 2)
.withConfigOverride(
c => c.withIncremental(false)
)Footnotes
Updated 22 days ago