Read from Layer
You can read partition metadata and the content of each partition (referred to as partition data), from the following layers:
- Versioned
- Volatile
- Index
- Stream
The functions read_partitions and read_stream return iterators, to consume one value at a time.
Note
Partition metadata - and the concept of 'partitions' in general - do not apply to Interactive Map nor Object Store layers. Look to sections below on this page specific to these layer types for the read methods applicable to each.
NoteDownloading of data may be performed with the
stream=Trueparameter. Streamed data will be returned as asIterator[bytes], where a chunk of data will be downloaded with each iteration. Decoding is not supported with streaming data, anddecode=Falsewill be implied ifstream=Trueis provided. A default chunk size that balances memory consumption and download throughput will be used unless overridden with thechunk_sizeparameter.
To execute the read examples below, you would first need to run the following code that defines some example catalogs and layers:
from here.platform import Platform
platform = Platform()
oma_catalog = platform.get_catalog('hrn:here:data::olp-here:oma-3')
versioned_layer = oma_catalog.get_layer('topology_geometry_segment')
wx_catalog = platform.get_catalog('hrn:here:data::olp-here:live-weather-eu')
volatile_layer = wx_catalog.get_layer('latest-data')
sdii_catalog = platform.get_catalog('hrn:here:data::olp-here:olp-sdii-sample-berlin-2')
index_layer = sdii_catalog.get_layer('sample-index-layer')
traffic_catalog = platform.get_catalog('hrn:here:data::olp-here:olp-traffic-1')
stream_layer = traffic_catalog.get_layer('traffic-incidents-delta-stream')
samples_catalog = platform.get_catalog('hrn:here:data::olp-here:here-geojson-samples')
objectstore_layer = samples_catalog.get_layer('objectstore')
test_catalog = platform.get_catalog(...)
interactive_map_layer = test_catalog.get_layer(...)Read partitions/data from versioned layer
To read data stored in a versioned layer, please refer to the read_partitions function of the VersionedLayer class. Provided a set of partition identifiers, the function returns the data associated with the partitions specified. If no partitions are specified, the content for whole layer is returned.
It's possible to specify which version of the catalog to query, by default the latest.
While read_partitions queries layer metadata and downloads corresponding data associated with each partition in one single call, it's also possible to query just the partition metadata and obtain and decode the associated data manually, if needed at a later time. To query just the partition metadata, please use to the get_partitions_metadata function.
For additional information, an exhaustive list of parameters and adapter-specific parameters, please consult the documentation of VersionedLayer.
Example: reading two partitions
The read_partitions method returns an iterator over objects for each requested partition. Each object is a tuple, the first element being of type VersionedPartition, and the second being the contents of that partition. The partition content is decoded according to the content type (and schema, if applied) specified in the layer configuration, according to the matrix above.
partitions = versioned_layer.read_partitions(partition_ids=[19377307, 19377333])
for p in partitions:
versioned_partition, partition_content = p
print(f"{versioned_partition.id}: {type(partition_content)}")19377307: <class 'SegmentPartition'>
19377333: <class 'SegmentPartition'>
Example: reading a specific catalog version and skipping decoding
In this example, data is not decoded and partition_content is of type bytes.
partitions = versioned_layer.read_partitions(partition_ids=[19377307, 19377333], version=10, decode=False)
for p in partitions:
versioned_partition, partition_content = p
print(f"{versioned_partition.id}: {type(partition_content)}")19377307: <class 'bytes'>
19377333: <class 'bytes'>
Example: streaming download
In this example, data is streamed and partition_content is of type Iterator[bytes].
partitions = versioned_layer.read_partitions(partition_ids=[19377307, 19377333], stream=True)
for p in partitions:
versioned_partition, partition_content = p
print(f"{versioned_partition.id}: {type(partition_content)}")19377307: <class 'bytes'>
19377333: <generator object generate at 0x7f9594a11cb0>
Example: obtaining the metadata and fetching the data manually
partitions = versioned_layer.get_partitions_metadata(partition_ids=[19377307, 19377333])
for partition in partitions:
print(partition.id)
data = partition.get_blob()
print(data)Read partitions/data from volatile layer
To read data stored in a volatile layer, please refer to the read_partitions function of the VolatileLayer class. Provided a set of partition identifiers, the function returns the data associated with the partitions specified. If no partitions are specified, the content for whole layer is returned.
While read_partitions queries layer metadata and downloads corresponding data associated with each partition in one single call, it's also possible to query just the partition metadata and obtain and decode the associated data manually, if needed at a later time. To query just the partition metadata, please use to the get_partitions_metadata function.
For additional information, exhaustive list of parameters and adapter-specific parameters, please consult the documentation of VolatileLayer.
Example: reading two partitions
The read_partitions method returns an iterator over objects for each requested partition. Each object is a tuple, the first element being of type VolatilePartition, and the second being the contents of that partition. The partition content is decoded according to the content type (and schema, if applied) specified in the layer configuration, according to the matrix above.
partitions = volatile_layer.read_partitions(partition_ids=[92259, 92262])
for p in partitions:
volatile_partition, partition_content = p
print(f"{volatile_partition.id}: {type(partition_content)}")92259: <class 'WeatherConditionPartition'>
92262: <class 'WeatherConditionPartition'>
Example: skipping decoding
In this example, data is not decoded and partition_contents is of type bytes.
partitions = volatile_layer.read_partitions(partition_ids=[92259, 92262], decode=False)
for p in partitions:
volatile_partition, partition_content = p
print(f"{volatile_partition.id}: {type(partition_content)}")92259: <class 'bytes'>
92262: <class 'bytes'>
Example: streaming download
In this example, data is streamed and partition_contents is of type Iterator[bytes].
partitions = volatile_layer.read_partitions(partition_ids=[92259, 92262], stream=True)
for p in partitions:
volatile_partition, partition_content = p
print(f"{volatile_partition.id}: {type(partition_content)}")92259: <class 'bytes'>
92262: <generator object generate at 0x7f9594a11cb0>
Example: obtaining the metadata and fetching the data manually
partitions = volatile_layer.get_partitions_metadata(partition_ids=[92259, 92262])
for partition in partitions:
print(partition.id)
data = partition.get_blob()
print(data)Read partitions/data from index layer
To read data stored in an index layer, please refer to the read_partitions function of the IndexLayer class. The function returns all the data associated with index partitions matching a RSQL query.
Data is decoded according to the content type specified in the layer configuration. The option of automatic decoding is supported only for content types indicated in the adapter support matrix above. In situations where content type is not supported, you should use decode=False and will need to decode the data separately after reading. Data may also be streamed in chunks using stream=True; streaming will skip decoding and imply decode=False.
While read_partitions queries via RSQL index metadata and downloads corresponding data associated with each partition in one single call, it's also possible to query via RSQL just the partition metadata and obtain and decode the associated data manually, if needed at a later time. To query just the partition metadata, please use to the get_partitions_metadata function.
For additional information, exhaustive list of parameters and adapter-specific parameters, please consult the documentation of IndexLayer.
Example: query partitions and read all returned data
Each returned partition is of type IndexPartition, and contains custom fields, as defined by the user who created the index layer. In this example, data is not decoded and partition_data is bytes.
partitions = index_layer.read_partitions(query="hour_from=ge=10", decode=False)
for partition, partition_data in partitions:
print(partition.fields, partition_data)Example: query partitions and stream returned data
partitions = index_layer.read_partitions(query="hour_from=ge=10", stream=True)
for partition, partition_data in partitions:
for partition_chunk in partition_data:
print(partition_chunk)Example: obtaining the metadata and fetching the data manually
partitions = index_layer.get_partitions_metadata(query="hour_from=ge=10")
for partition in partitions:
print(partition.fields)
data = partition.get_blob()
print(data)Example: obtaining the part ids for fetching data by parts
parts = index_layer.get_parts(num_requested_parts=4, billing_tag="billing-tag-value")
Example: obtaining the metadata using part id and fetching the data manually
resp = index_layer.get_parts(num_requested_parts=4, billing_tag="billing-tag-value")
for val in resp['parts']:
partitions = index_layer.get_partitions_metadata(query="hour_from=ge=10", part=val['partId'])
for partition in partitions:
print(partition.fields)
data = partition.get_blob()
print(data)Read partitions/data from stream layer
To read from a stream layer, a subscription must be created first via the subscribe function of the StreamLayer class. The function instantiates on the HERE platform a Kafka consumer, that is later queried via its REST API to read messages from the layer. The subscribe function returns a StreamSubscription. Please unsubscribe when reading is complete to free resources on the platform.
To consume data stored in a stream layer, please refer to the read_stream function. The function consumes the stream and returns the messages and corresponding content. This method requires a StreamSubscription.
While read_stream consumes the messages and downloads corresponding data associated with each message in one single call, it's also possible to consume and retrieve just the messages (partition metadata) and obtain and decode the associated data manually, if needed at a later time. To consume just the metadata, please use to the get_stream_metadata function. This function also requires a StreamSubscription.
For additional information, exhaustive list of parameters and adapter-specific parameters, please consult the documentation of StreamLayer.
Example: consuming content
Data is decoded according to the content type specified in the layer configuration. The type of what is actually returned in partition_data depends on the content type and adapter used, according to the matrix above.
Each returned partition is of type StreamPartition.
When the with block terminates either successfully or unsuccessfully, subscription.unsubscribe() will be called internally.
with stream_layer.subscribe() as subscription:
partitions = stream_layer.read_stream(subscription=subscription)
for partition, content in partitions:
print(f"{partition.id}: {content}")
Example: skipping decoding
In this example, data is not decoded and partition_data is bytes.
subscription = stream_layer.subscribe()
try:
partitions = stream_layer.read_stream(subscription=subscription, decode=False)
for partition, content in partitions:
print(f"{partition.id}: {content}")
finally:
subscription.unsubscribe()Example: stream data
In this example, data is not decoded and partition_data is Iterator[bytes].
subscription = stream_layer.subscribe()
try:
partitions = stream_layer.read_stream(subscription=subscription, stream=True)
for partition, content in partitions:
print(partition.id)
for content_chunk in content:
print(content)
finally:
subscription.unsubscribe()Example: obtaining the metadata and fetching the data manually A distinguishing characteristics of the stream layer, compared for example to versioned and volatile layer, is that partition metadata (messages on the Kafka stream) can contain the data inline if small enough. The data may be included directly in each message, instead of being stored through the Blob API.
get_blob returns the data by retrieving it from the Blob API. get_data, specific to the stream layer, returns the data if inline and, only when needed, retrieves it from the Blob API. It is therefore recommended to use get_data.
subscription = stream_layer.subscribe()
try:
partitions = stream_layer.get_stream_metadata(subscription=subscription)
for partition in partitions:
print(partition.id, partition.timestamp)
data = partition.get_data()
print(data)
finally:
subscription.unsubscribe()Example: consuming and producing content using direct Kafka Direct Kafka support enables the users to retrieve instance of Kafka Consumer and Kafka Producer. Using these instances user can write and read in stream layer. These instances are configurable.
For more details on topic level consumer configuration https://kafka.apache.org/11/documentation.html#newconsumerconfigs.
For more details on topic level producer configuration https://kafka.apache.org/11/documentation.html#producerconfigs.
producer = stream_layer.kafka_producer(value_serializer=lambda x:json.dumps(x).encode('utf-8'))
topic = stream_layer.get_kafka_topic()
for x in range(10):
data = {'x': x, '2x': x*2}
producer.send(topic, value=data)
producer.close()
consumer = stream_layer.kafka_consumer(value_deserializer=lambda x:json.loads(x.decode('utf-8')))
for message in consumer:
print(f"Message is {message.value}")
consumer.close()Read features from Interactive Map layer
This layer type does not have the concept of partitions and encoded data. There are no functions that read raw data or support decode parameters. Interactive Map layers are designed around the concept of features, in the sense of GeoJSON FeatureCollection, instead of partitions.
When using the default adapter, FeatureCollection or iterator of Feature (both GeoJSON concepts) are returned directly.
Some of the functions to retrieve the feature from the layer:
get_featuressearch_featuresiter_featuresget_features_in_bounding_boxspatial_searchspatial_search_geometry
Example: Read multiple features from an Interactive Map layer using get_features
features = interactive_map_layer.get_features(feature_ids=["feature_id1", "feature_id2", "feature_id3"])Example: Search for and retrieve features based on properties using search_features
features = interactive_map_layer.search_features(params={"p.name": "foo", "p.type": "bar"})Example: Retrieve all features in a layer using iter_features
for feature in interactive_map_layer.iter_features():
print(feature)Example: Find and retrieve all features in a bounding box using get_features_by_bbox
bbox_features = interactive_map_layer.get_features_in_bounding_box(bounds=(-171.791110603, 18.91619, -66.96466, 71.3577635769))Example: Find and retrieve all features within given radius of input point using spatial_search
features = interactive_map_layer.spatial_search(lng=-95.95417, lat=41.6065, radius=1000)Example: Find and retrieve all features within arbitrary geometry using spatial_search_geometry
from shapely.geometry import Polygon
polygon = Polygon([(0, 0), (1, 1), (1, 0)])
features = interactive_map_layer.spatial_search_geometry(geometry=polygon)Interactive Map layer subscriptions to Destination Stream layer.
Some of the functions for IML Subscriptions to Destination Stream Layer:
get_all_subscriptionssubscribesubscription_statussubscription_existsget_subscriptionunsubscribe
Example: List All Subscriptions get_all_subscriptions
subscriptions_gen = interactive_map_layer.get_all_subscriptions(limit=10)
while True:
try:
subscriptions_list = next(subscriptions_gen)
print([subscription.subscription_hrn for subscription in subscriptions_list])
except StopIteration:
break
Example: Subscribe to Destination Stream Layer using subscribe
import uuid
from here.platform.layer import InteractiveMapSubscriptionType
x_idempotency_key=str(uuid.uuid4())
interactive_map_subscription = interactive_map_layer.subscribe(subscription_name="test-iml-subscription-name-123",
description="this is a test iml subscription",
destination_catalog_hrn="hrn:here:data::olp-cs:pysdk-test-catalog-for-iml-subs",
destination_layer_id="stream-raw-demo",
interactive_map_subscription_type=InteractiveMapSubscriptionType.PER_FEATURE)
interactive_map_subscriptionExample: Get Subscription Status subscription_status
subscription_status = interactive_map_layer.subscription_status(status_token=interactive_map_subscription.status_token)
subscription_status, subscription_status.status
Example: Subscription Exists subscription_exists
subscription_exists = interactive_map_layer.subscription_exists(subscription_hrn=interactive_map_subscription.subscription_hrn)
subscription_exists
Example: Get Subscription get_subscription
subscription = interactive_map_layer.get_subscription(subscription_hrn=interactive_map_subscription.subscription_hrn)
subscription, subscription.subscription_name, subscription.subscription_hrn
Example: Unsubscribe to Destination Stream Layer using unsubscribe
import uuid
x_idempotency_key=str(uuid.uuid4())
unsubscribed = unsubscribe(subscription_hrn=interactive_map_subscription.subscription_hrn)
unsubscribed, unsubscribed.status_tokenRead data from object store layer
Object Store layers offer a key/value store which mimics the behavior of a filesystem. Keys equate to the file path+name (relative to root of the layer). Corresponding values are bytes equating to the contents of said file. Layers of this type do not have the concept of partitions.
Functions provided for retrieving key/value information from Object Store layers include:
key_existslist_keysiter_keysget_object_metadataread_object
Example: Check if layer contains a given key with key_exists
key_found = objectstore_layer.key_exists(key = "berlin/districts_of_berlin_tiled/23618356")Example: List all keys (subdirectories and files) under a given parent key (directory) with list_keys
The optional parent parameter specifies which level in the layer's key hierarchy to use as root for the listing. If omitted, the root of the layer itself is used. The deep parameter indicates whether you want to retrieve the entire hierarchy of descendants (True) or only direct descendants (False). If omitted, deep is assumed to be False. This method will return a list of keys as strings.
everything_under_berlin_folder = objectstore_layer.list_keys(parent = "berlin", deep = True)Example: Get an Iterator over all keys under a given parent key (directory) with iter_keys
This method is analogous to list_keys, but returns an Iterator instead of a List.
berlin_files_iter = objectstore_layer.iter_keys(parent = "berlin", deep = True)Example: Get metadata for an object with get_object_metadata
This method returns metadata associated with a given key. The information contained includes:
- Last-Modified date
- Data size
- Content type
- Content encoding
obj_metadata = objectstore_layer.get_object_metadata("berlin/districts_of_berlin_tiled/23618355")Example: Read object for a given key with read_object
The read_object method offers an optional include_metadata parameter which specifies whether or not to also return the metadata associated with an object. By default, the value is False and read_object will return only the content requested. If True, a tuple containing content plus object_metadata will be returned. If stream is True, the data will be returned as Iterator[bytes] rather than bytes.
geojson_content = objectstore_layer.read_object("berlin/districts_of_berlin_tiled/23618355")Updated 3 days ago