GuidesTypeScript API ReferencePython v2 API Reference
Guides

You can read partition metadata and the content of each partition (referred to as partition data), from the following layers:

  • Versioned
  • Volatile
  • Index
  • Stream

The functions read_partitions and read_stream return iterators, to consume one value at a time.

📘

Note

Partition metadata - and the concept of 'partitions' in general - do not apply to Interactive Map nor Object Store layers. Look to sections below on this page specific to these layer types for the read methods applicable to each.

Note

Downloading of data may be performed with the stream=True parameter. Streamed data will be returned as as Iterator[bytes], where a chunk of data will be downloaded with each iteration. Decoding is not supported with streaming data, and decode=False will be implied if stream=True is provided. A default chunk size that balances memory consumption and download throughput will be used unless overridden with the chunk_size parameter.

To execute the read examples below, you would first need to run the following code that defines some example catalogs and layers:

from here.platform import Platform

platform = Platform()

oma_catalog = platform.get_catalog('hrn:here:data::olp-here:oma-3')
versioned_layer = oma_catalog.get_layer('topology_geometry_segment')

wx_catalog = platform.get_catalog('hrn:here:data::olp-here:live-weather-eu')
volatile_layer = wx_catalog.get_layer('latest-data')

sdii_catalog = platform.get_catalog('hrn:here:data::olp-here:olp-sdii-sample-berlin-2')
index_layer = sdii_catalog.get_layer('sample-index-layer')

traffic_catalog = platform.get_catalog('hrn:here:data::olp-here:olp-traffic-1')
stream_layer = traffic_catalog.get_layer('traffic-incidents-delta-stream')

samples_catalog = platform.get_catalog('hrn:here:data::olp-here:here-geojson-samples')
objectstore_layer = samples_catalog.get_layer('objectstore')

test_catalog = platform.get_catalog(...)
interactive_map_layer = test_catalog.get_layer(...)

Read partitions/data from versioned layer

To read data stored in a versioned layer, please refer to the read_partitions function of the VersionedLayer class. Provided a set of partition identifiers, the function returns the data associated with the partitions specified. If no partitions are specified, the content for whole layer is returned.

It's possible to specify which version of the catalog to query, by default the latest.

While read_partitions queries layer metadata and downloads corresponding data associated with each partition in one single call, it's also possible to query just the partition metadata and obtain and decode the associated data manually, if needed at a later time. To query just the partition metadata, please use to the get_partitions_metadata function.

For additional information, an exhaustive list of parameters and adapter-specific parameters, please consult the documentation of VersionedLayer.

Example: reading two partitions The read_partitions method returns an iterator over objects for each requested partition. Each object is a tuple, the first element being of type VersionedPartition, and the second being the contents of that partition. The partition content is decoded according to the content type (and schema, if applied) specified in the layer configuration, according to the matrix above.

partitions = versioned_layer.read_partitions(partition_ids=[19377307, 19377333])

for p in partitions:
    versioned_partition, partition_content = p
    print(f"{versioned_partition.id}: {type(partition_content)}")
19377307: <class 'SegmentPartition'>
19377333: <class 'SegmentPartition'>

Example: reading a specific catalog version and skipping decoding In this example, data is not decoded and partition_content is of type bytes.

partitions = versioned_layer.read_partitions(partition_ids=[19377307, 19377333], version=10, decode=False)

for p in partitions:
    versioned_partition, partition_content = p
    print(f"{versioned_partition.id}: {type(partition_content)}")
19377307: <class 'bytes'>
19377333: <class 'bytes'>

Example: streaming download In this example, data is streamed and partition_content is of type Iterator[bytes].

partitions = versioned_layer.read_partitions(partition_ids=[19377307, 19377333], stream=True)

for p in partitions:
    versioned_partition, partition_content = p
    print(f"{versioned_partition.id}: {type(partition_content)}")
19377307: <class 'bytes'>
19377333: <generator object generate at 0x7f9594a11cb0>

Example: obtaining the metadata and fetching the data manually

partitions = versioned_layer.get_partitions_metadata(partition_ids=[19377307, 19377333])

for partition in partitions:
    print(partition.id)
    data = partition.get_blob()
    print(data)

Read partitions/data from volatile layer

To read data stored in a volatile layer, please refer to the read_partitions function of the VolatileLayer class. Provided a set of partition identifiers, the function returns the data associated with the partitions specified. If no partitions are specified, the content for whole layer is returned.

While read_partitions queries layer metadata and downloads corresponding data associated with each partition in one single call, it's also possible to query just the partition metadata and obtain and decode the associated data manually, if needed at a later time. To query just the partition metadata, please use to the get_partitions_metadata function.

For additional information, exhaustive list of parameters and adapter-specific parameters, please consult the documentation of VolatileLayer.

Example: reading two partitions The read_partitions method returns an iterator over objects for each requested partition. Each object is a tuple, the first element being of type VolatilePartition, and the second being the contents of that partition. The partition content is decoded according to the content type (and schema, if applied) specified in the layer configuration, according to the matrix above.

partitions = volatile_layer.read_partitions(partition_ids=[92259, 92262])

for p in partitions:
    volatile_partition, partition_content = p
    print(f"{volatile_partition.id}: {type(partition_content)}")
92259: <class 'WeatherConditionPartition'>
92262: <class 'WeatherConditionPartition'>

Example: skipping decoding In this example, data is not decoded and partition_contents is of type bytes.

partitions = volatile_layer.read_partitions(partition_ids=[92259, 92262], decode=False)

for p in partitions:
    volatile_partition, partition_content = p
    print(f"{volatile_partition.id}: {type(partition_content)}")
92259: <class 'bytes'>
92262: <class 'bytes'>

Example: streaming download In this example, data is streamed and partition_contents is of type Iterator[bytes].

partitions = volatile_layer.read_partitions(partition_ids=[92259, 92262], stream=True)

for p in partitions:
    volatile_partition, partition_content = p
    print(f"{volatile_partition.id}: {type(partition_content)}")
92259: <class 'bytes'>
92262: <generator object generate at 0x7f9594a11cb0>

Example: obtaining the metadata and fetching the data manually

partitions = volatile_layer.get_partitions_metadata(partition_ids=[92259, 92262])

for partition in partitions:
    print(partition.id)
    data = partition.get_blob()
    print(data)

Read partitions/data from index layer

To read data stored in an index layer, please refer to the read_partitions function of the IndexLayer class. The function returns all the data associated with index partitions matching a RSQL query.

Data is decoded according to the content type specified in the layer configuration. The option of automatic decoding is supported only for content types indicated in the adapter support matrix above. In situations where content type is not supported, you should use decode=False and will need to decode the data separately after reading. Data may also be streamed in chunks using stream=True; streaming will skip decoding and imply decode=False.

While read_partitions queries via RSQL index metadata and downloads corresponding data associated with each partition in one single call, it's also possible to query via RSQL just the partition metadata and obtain and decode the associated data manually, if needed at a later time. To query just the partition metadata, please use to the get_partitions_metadata function.

For additional information, exhaustive list of parameters and adapter-specific parameters, please consult the documentation of IndexLayer.

Example: query partitions and read all returned data Each returned partition is of type IndexPartition, and contains custom fields, as defined by the user who created the index layer. In this example, data is not decoded and partition_data is bytes.

partitions = index_layer.read_partitions(query="hour_from=ge=10", decode=False)

for partition, partition_data in partitions:
    print(partition.fields, partition_data)

Example: query partitions and stream returned data

partitions = index_layer.read_partitions(query="hour_from=ge=10", stream=True)

for partition, partition_data in partitions:
    for partition_chunk in partition_data:
        print(partition_chunk)

Example: obtaining the metadata and fetching the data manually

partitions = index_layer.get_partitions_metadata(query="hour_from=ge=10")

for partition in partitions:
    print(partition.fields)
    data = partition.get_blob()
    print(data)

Example: obtaining the part ids for fetching data by parts


parts = index_layer.get_parts(num_requested_parts=4, billing_tag="billing-tag-value")

Example: obtaining the metadata using part id and fetching the data manually


resp = index_layer.get_parts(num_requested_parts=4, billing_tag="billing-tag-value")

for val in resp['parts']:

    partitions = index_layer.get_partitions_metadata(query="hour_from=ge=10", part=val['partId'])

    for partition in partitions:
        print(partition.fields)
        data = partition.get_blob()
        print(data)

Read partitions/data from stream layer

To read from a stream layer, a subscription must be created first via the subscribe function of the StreamLayer class. The function instantiates on the HERE platform a Kafka consumer, that is later queried via its REST API to read messages from the layer. The subscribe function returns a StreamSubscription. Please unsubscribe when reading is complete to free resources on the platform.

To consume data stored in a stream layer, please refer to the read_stream function. The function consumes the stream and returns the messages and corresponding content. This method requires a StreamSubscription.

While read_stream consumes the messages and downloads corresponding data associated with each message in one single call, it's also possible to consume and retrieve just the messages (partition metadata) and obtain and decode the associated data manually, if needed at a later time. To consume just the metadata, please use to the get_stream_metadata function. This function also requires a StreamSubscription.

For additional information, exhaustive list of parameters and adapter-specific parameters, please consult the documentation of StreamLayer.

Example: consuming content Data is decoded according to the content type specified in the layer configuration. The type of what is actually returned in partition_data depends on the content type and adapter used, according to the matrix above.

Each returned partition is of type StreamPartition.

When the with block terminates either successfully or unsuccessfully, subscription.unsubscribe() will be called internally.


with stream_layer.subscribe() as subscription:
    partitions = stream_layer.read_stream(subscription=subscription)
    for partition, content in partitions:
        print(f"{partition.id}: {content}")

Example: skipping decoding In this example, data is not decoded and partition_data is bytes.

subscription = stream_layer.subscribe()

try:
    partitions = stream_layer.read_stream(subscription=subscription, decode=False)
    for partition, content in partitions:
        print(f"{partition.id}: {content}")
finally:
    subscription.unsubscribe()

Example: stream data In this example, data is not decoded and partition_data is Iterator[bytes].

subscription = stream_layer.subscribe()

try:
    partitions = stream_layer.read_stream(subscription=subscription, stream=True)
    for partition, content in partitions:
        print(partition.id)
        for content_chunk in content:
            print(content)
finally:
    subscription.unsubscribe()

Example: obtaining the metadata and fetching the data manually A distinguishing characteristics of the stream layer, compared for example to versioned and volatile layer, is that partition metadata (messages on the Kafka stream) can contain the data inline if small enough. The data may be included directly in each message, instead of being stored through the Blob API.

get_blob returns the data by retrieving it from the Blob API. get_data, specific to the stream layer, returns the data if inline and, only when needed, retrieves it from the Blob API. It is therefore recommended to use get_data.

subscription = stream_layer.subscribe()

try:
    partitions = stream_layer.get_stream_metadata(subscription=subscription)

    for partition in partitions:
        print(partition.id, partition.timestamp)
        data = partition.get_data()
        print(data)

finally:
    subscription.unsubscribe()

Example: consuming and producing content using direct Kafka Direct Kafka support enables the users to retrieve instance of Kafka Consumer and Kafka Producer. Using these instances user can write and read in stream layer. These instances are configurable.

For more details on topic level consumer configuration https://kafka.apache.org/11/documentation.html#newconsumerconfigs.

For more details on topic level producer configuration https://kafka.apache.org/11/documentation.html#producerconfigs.

producer = stream_layer.kafka_producer(value_serializer=lambda x:json.dumps(x).encode('utf-8'))
topic = stream_layer.get_kafka_topic()
for x in range(10):
    data = {'x': x, '2x': x*2}
    producer.send(topic, value=data)
producer.close()

consumer = stream_layer.kafka_consumer(value_deserializer=lambda x:json.loads(x.decode('utf-8')))
for message in consumer:
    print(f"Message is {message.value}")
consumer.close()

Read features from Interactive Map layer

This layer type does not have the concept of partitions and encoded data. There are no functions that read raw data or support decode parameters. Interactive Map layers are designed around the concept of features, in the sense of GeoJSON FeatureCollection, instead of partitions.

When using the default adapter, FeatureCollection or iterator of Feature (both GeoJSON concepts) are returned directly.

Some of the functions to retrieve the feature from the layer:

  • get_features
  • search_features
  • iter_features
  • get_features_in_bounding_box
  • spatial_search
  • spatial_search_geometry

Example: Read multiple features from an Interactive Map layer using get_features

features = interactive_map_layer.get_features(feature_ids=["feature_id1", "feature_id2", "feature_id3"])

Example: Search for and retrieve features based on properties using search_features

features = interactive_map_layer.search_features(params={"p.name": "foo", "p.type": "bar"})

Example: Retrieve all features in a layer using iter_features

for feature in interactive_map_layer.iter_features():
    print(feature)

Example: Find and retrieve all features in a bounding box using get_features_by_bbox

bbox_features = interactive_map_layer.get_features_in_bounding_box(bounds=(-171.791110603, 18.91619, -66.96466, 71.3577635769))

Example: Find and retrieve all features within given radius of input point using spatial_search

features = interactive_map_layer.spatial_search(lng=-95.95417, lat=41.6065, radius=1000)

Example: Find and retrieve all features within arbitrary geometry using spatial_search_geometry

from shapely.geometry import Polygon

polygon = Polygon([(0, 0), (1, 1), (1, 0)])
features = interactive_map_layer.spatial_search_geometry(geometry=polygon)

Interactive Map layer subscriptions to Destination Stream layer.

Some of the functions for IML Subscriptions to Destination Stream Layer:

  • get_all_subscriptions
  • subscribe
  • subscription_status
  • subscription_exists
  • get_subscription
  • unsubscribe

Example: List All Subscriptions get_all_subscriptions


subscriptions_gen = interactive_map_layer.get_all_subscriptions(limit=10)

while True:
    try:
        subscriptions_list = next(subscriptions_gen)
        print([subscription.subscription_hrn for subscription in subscriptions_list])
    except StopIteration:
        break

Example: Subscribe to Destination Stream Layer using subscribe

import uuid
from here.platform.layer import InteractiveMapSubscriptionType

x_idempotency_key=str(uuid.uuid4())
interactive_map_subscription = interactive_map_layer.subscribe(subscription_name="test-iml-subscription-name-123",
                        description="this is a test iml subscription",
                        destination_catalog_hrn="hrn:here:data::olp-cs:pysdk-test-catalog-for-iml-subs",
                        destination_layer_id="stream-raw-demo",
                        interactive_map_subscription_type=InteractiveMapSubscriptionType.PER_FEATURE)

interactive_map_subscription

Example: Get Subscription Status subscription_status


subscription_status = interactive_map_layer.subscription_status(status_token=interactive_map_subscription.status_token)
subscription_status, subscription_status.status

Example: Subscription Exists subscription_exists


subscription_exists = interactive_map_layer.subscription_exists(subscription_hrn=interactive_map_subscription.subscription_hrn)
subscription_exists

Example: Get Subscription get_subscription

subscription = interactive_map_layer.get_subscription(subscription_hrn=interactive_map_subscription.subscription_hrn)
subscription, subscription.subscription_name, subscription.subscription_hrn

Example: Unsubscribe to Destination Stream Layer using unsubscribe

import uuid

x_idempotency_key=str(uuid.uuid4())
unsubscribed = unsubscribe(subscription_hrn=interactive_map_subscription.subscription_hrn)
unsubscribed, unsubscribed.status_token

Read data from object store layer

Object Store layers offer a key/value store which mimics the behavior of a filesystem. Keys equate to the file path+name (relative to root of the layer). Corresponding values are bytes equating to the contents of said file. Layers of this type do not have the concept of partitions.

Functions provided for retrieving key/value information from Object Store layers include:

  • key_exists
  • list_keys
  • iter_keys
  • get_object_metadata
  • read_object

Example: Check if layer contains a given key with key_exists

key_found = objectstore_layer.key_exists(key = "berlin/districts_of_berlin_tiled/23618356")

Example: List all keys (subdirectories and files) under a given parent key (directory) with list_keys The optional parent parameter specifies which level in the layer's key hierarchy to use as root for the listing. If omitted, the root of the layer itself is used. The deep parameter indicates whether you want to retrieve the entire hierarchy of descendants (True) or only direct descendants (False). If omitted, deep is assumed to be False. This method will return a list of keys as strings.

everything_under_berlin_folder = objectstore_layer.list_keys(parent = "berlin", deep = True)

Example: Get an Iterator over all keys under a given parent key (directory) with iter_keys This method is analogous to list_keys, but returns an Iterator instead of a List.

berlin_files_iter = objectstore_layer.iter_keys(parent = "berlin", deep = True)

Example: Get metadata for an object with get_object_metadata This method returns metadata associated with a given key. The information contained includes:

  • Last-Modified date
  • Data size
  • Content type
  • Content encoding
obj_metadata = objectstore_layer.get_object_metadata("berlin/districts_of_berlin_tiled/23618355")

Example: Read object for a given key with read_object The read_object method offers an optional include_metadata parameter which specifies whether or not to also return the metadata associated with an object. By default, the value is False and read_object will return only the content requested. If True, a tuple containing content plus object_metadata will be returned. If stream is True, the data will be returned as Iterator[bytes] rather than bytes.

geojson_content = objectstore_layer.read_object("berlin/districts_of_berlin_tiled/23618355")