How to read and write to Object store layer using Hadoop FS support in Spark
How to read and write to Object store layer using Hadoop FS support in Spark
Objectives: Understand how to use the Hadoop FS Support to read and write data to Object store layer using Spark.
Complexity: Beginner
Time to complete: 30 min
Prerequisites: Organize your work in projects
Source code: Download
The example in this tutorial demonstrates how to use the Hadoop FS Support component provided by the Data Client Library. This provides support to access the data stored in the Object store layer, using standard tools like Apache Spark, with minimum customized code.
The tutorial has following steps:
- Create a catalog with the Object store layer.
- Write an application to generate test data in the
parquetformat. - Write the test data to the Object store layer, using the hadoop-fs-support library.
- Read the test data from the Object store layer, using the hadoop-fs-support library.
- Access the data stored in the Object store layer, using the CLI.
As a preparation step, you must create your catalog with the Object store layer type.
Set up the Maven project
Create the following folder structure for the project:
hadoop-fs-support-spark-pipeline
└── src
└── main
├── java
└── resources
└── scala
You can do this with a single bash command:
mkdir -p hadoop-fs-support-spark-pipeline/src/main/{java,resources,scala}Create catalog
You must create your catalog. You can accomplish this by following the steps outlined in the Organize your work in projects, using the OLP Command Line Interface (CLI).
Use a unique identifier name for the catalog. For example, $YOUR_USERNAME-hadoop-fs-support-spark-pipeline.
Create a file called hadoop-fs-support-spark-pipeline.json with the following contents, replacing $YOUR_CATALOG_ID with your chosen identifier.
{
"id": "hadoop-fs-support-spark-pipeline-catalog",
"name": "Tutorial for reading and writing data to Object store layer using Hadoop FS Support",
"summary": "Tutorial for reading and writing data to Object store layer using Hadoop FS Support",
"description": "Tutorial for reading and writing data to Object store layer using Hadoop FS Support",
"tags": ["Hadoop FS Support", "Object store"],
"layers": [
{
"id": "parquet",
"name": "parquet-layer",
"summary": "Simulated data.",
"description": "Simulated parquet data to demonstrate usability of Object store layer",
"tags": ["Hadoop FS Support", "Object store"],
"layerType": "objectstore",
"volume": {
"volumeType": "durable"
}
}
]
}
NoteIf a billing tag is required in your realm, update the config file by adding the
billingTags: ["YOUR_BILLING_TAG"]property to thelayersection.
Replace $YOUR_CATALOG_ID with your own identifier. Also, replace $YOUR_PROJECT_HRN with the
HRN of your project from Organize your work in projects, then run the following command:
#!/usr/bin/env bash
set -o nounset -o errexit -o xtrace
### [catalog]
olp catalog create {{YOUR_CATALOG_ID}} \
"Tutorial for reading and writing data to Object store layer using Hadoop FS Support ({{YOUR_USERNAME}})" \
--config hadoop-fs-support-spark-pipeline.json \
--scope {{YOUR_PROJECT_HRN}}
### [catalog]
Setup pipeline configurations
- Create a file named
pipeline-config.conf, and populate it with the following snippet. - Replace
$YOUR_CATALOG_HRNwith the HRN from the catalog you created in Organize your work in projects.
This tutorial can also run outside of pipeline environment. The only difference is that your application code must set the value of catalogHrn with the HRN you received from your catalog creation step.
pipeline.config {
output-catalog {hrn = "hrn:here:data::olp-here:dummy"}
input-catalogs {
objectStoreCatalog { hrn = "{{YOUR_CATALOG_HRN}}" }
}
}
Set up the project in Maven
In order to develop an application which runs on pipelines with Spark, use
the sdk-batch-bom_2.13 as the parent pom for the application:
<parent>
<groupId>com.here.platform</groupId>
<artifactId>sdk-batch-bom_2.13</artifactId>
<version>2.85.8</version>
<relativePath/>
</parent>Adjust dependencies for Scala and Java.
<dependencies>
<dependency>
<groupId>com.here.platform.pipeline</groupId>
<artifactId>pipeline-interface_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.data.client</groupId>
<artifactId>hadoop-fs-support_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>at.yawk.lz4</groupId>
<artifactId>lz4-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
</dependencies>Implement the application
This application writes parquet data to the Object store layer, using Hadoop FS Support, and then running a simple spark-sql query to select rows for which the index column has an even value.
/*
* Copyright (c) 2018-2026 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.slf4j.LoggerFactory
object HadoopFsSupportSparkScala {
def main(args: Array[String]): Unit = {
val logger = LoggerFactory.getLogger(HadoopFsSupportSparkScala.getClass)
val pipelineContext = new PipelineContext
val catalogHrn = pipelineContext.config.inputCatalogs("objectStoreCatalog").toString
val layerId = "parquet"
val sparkSession =
SparkSession
.builder()
.appName("HadoopFsSupportSparkScala")
.getOrCreate()
val dataFrame = generateDataFrame(sparkSession)
// Write data frame as parquet at random directory location in object store layer
val parquetDir = "parquet-dir"
logger.info(s"Writing parquet files at: blobfs://$catalogHrn:$layerId/$parquetDir")
dataFrame.write
.parquet(s"blobfs://$catalogHrn:$layerId/$parquetDir")
// Read parquet data from object store layer, select only those rows where index column has and even value
logger.info(s"Reading parquet files at: blobfs://$catalogHrn:$layerId/$parquetDir")
val parquetData: DataFrame =
sparkSession.read
.parquet(s"blobfs://$catalogHrn:$layerId/$parquetDir")
.select("index")
.where("tileId % 2 == 0")
.sort("index")
val parquetArray: Array[Row] = parquetData.collect()
printDfArray(parquetArray)
}
// Generate test data in csv format
private def generateDataFrame(sparkSession: SparkSession): DataFrame = {
import sparkSession.implicits._
val csvData: Dataset[String] = sparkSession.sparkContext.parallelize("""
|index,tileId
|0,0
|1,1
|2,3
|3,6
|4,4
|5,5
|6,7
|7,2
|8,8
|9,10
""".stripMargin.lines.toArray.toSeq.map(_.toString)).toDS
sparkSession.read
.option("header", value = true)
.option("timestampFormat", "MM/dd/yyyy")
.option("inferSchema", value = true)
.csv(csvData)
}
// Print data frame
private def printDfArray(rowArray: Array[Row]): Unit =
for (row <- rowArray) {
for (i <- 0 until row.size) {
println(row.get(i))
}
}
}/*
* Copyright (c) 2018-2026 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.here.platform.pipeline.PipelineContext;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HadoopFsSupportSparkJava {
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(HadoopFsSupportSparkJava.class);
PipelineContext pipelineContext = new PipelineContext();
String catalogHrn =
pipelineContext.getConfig().getInputCatalogs().get("objectStoreCatalog").toString();
String layerId = "parquet";
SparkSession sparkSession =
SparkSession.builder().appName("HadoopFsSupportSparkJava").getOrCreate();
Dataset<Row> dataFrame = generateDataFrame(sparkSession);
// Write data frame as parquet at random directory location in object store layer
String parquetDir = "parquet-dir";
logger.info(
"Writing parquet files at: blobfs://" + catalogHrn + ":" + layerId + "/" + parquetDir);
dataFrame.write().parquet("blobfs://" + catalogHrn + ":" + layerId + "/" + parquetDir);
// Read parquet data from object store layer, select only those rows where index column has and
// even value
logger.info(
"Reading parquet files at: blobfs://" + catalogHrn + ":" + parquetDir + "/" + parquetDir);
Dataset<Row> parquetData =
sparkSession
.read()
.parquet("blobfs://" + catalogHrn + ":" + layerId + "/" + parquetDir)
.select("index")
.where("tileId % 2 == 0")
.sort("index");
List<Row> parquetArray = parquetData.collectAsList();
printDfArray(parquetArray);
}
// Generate test data in csv format
private static Dataset<Row> generateDataFrame(SparkSession sparkSession) {
String csvString =
String.join(
"\n",
"index,tileId",
"0,0",
"1,1",
"2,3",
"3,6",
"4,4",
"5,5",
"6,7",
"7,2",
"8,8",
"9,10");
ArrayList<String> data = new ArrayList<>(Arrays.asList(csvString.split("\n")));
Dataset<String> csvData = sparkSession.createDataset(data, Encoders.STRING());
return sparkSession
.read()
.option("header", true)
.option("timestampFormat", "MM/dd/yyyy")
.option("inferSchema", true)
.csv(csvData);
}
// Print data frame
private static void printDfArray(List<Row> rowArray) {
for (Row row : rowArray) {
for (int i = 0; i < row.length(); i++) {
System.out.println(row.get(i));
}
}
}
}Run the application
To run the application locally, use the following command:
mvn compile exec:exec \
-Dexec.args="--add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
-cp %classpath \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope=$YOUR_PROJECT_HRN \
-Dspark.master="local[*]" HadoopFsSupportSparkScala"mvn compile exec:exec \
-Dexec.args="--add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
-cp %classpath \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope=$YOUR_PROJECT_HRN \
-Dspark.master="local[*]" HadoopFsSupportSparkJava"Consider the set of -Dhere.platform.data-client.request-signer.credentials.here-account.* params. We specify this parameters to pass data from the credentials.properties file and $YOUR_PROJECT_HRN to the Data Client Library. For more details about initializing the Data Client Library, see Set Your Credentials via Java System Properties.
Access the data using CLI
Using the HRN of the catalog, which you created in the catalog creation step, you can look at the contents of Object store layer:
olp catalog layer object list $YOUR_CATALOG_HRN parquet --key parquet-dirThe CLI output should be similar to (the following is sample output, not exact values):
name keyType lastModified size
parquet-dir/_SUCCESS object 2021-01-12T19:23:47Z 0
parquet-dir/part-00000-5bf23e5f-53be-4c83-917c-69ca6e042934-c000.snappy.parquet object 2021-01-12T19:23:39Z 353
...
You can download the data stored in the Object store layer referenced by a certain key:
olp catalog layer object get $YOUR_CATALOG_HRN parquet --key parquet-dir/part-00000-5bf23e5f-53be-4c83-917c-69ca6e042934-c000.snappy.parquetFurther information
For additional details on the topics covered in this tutorial, you can refer to the following sources:
-
For information on the Object store layer type, check Data Service documentation
-
For details on the Hadoop FS Support library, check Data Client Library Developer Guide
-
For details on command line interface interactions which can be implemented with the Object store layer, check Command Line Interface Developer Guide
Updated 18 days ago