AWS-powered information lakes, supported by the unequalled availability of Amazon Easy Storage Service (Amazon S3), can deal with the size, agility, and suppleness required to mix completely different information and analytics approaches. As information lakes have grown in measurement and matured in utilization, a big quantity of effort may be spent preserving the info according to enterprise occasions. To make sure recordsdata are up to date in a transactionally constant method, a rising variety of prospects are utilizing open-source transactional desk codecs similar to Apache Iceberg, Apache Hudi, and Linux Basis Delta Lake that allow you to retailer information with excessive compression charges, natively interface along with your purposes and frameworks, and simplify incremental information processing in information lakes constructed on Amazon S3. These codecs allow ACID (atomicity, consistency, isolation, sturdiness) transactions, upserts, and deletes, and superior options similar to time journey and snapshots that have been beforehand solely out there in information warehouses. Every storage format implements this performance in barely other ways; for a comparability, seek advice from Selecting an open desk format to your transactional information lake on AWS.
In 2023, AWS introduced normal availability for Apache Iceberg, Apache Hudi, and Linux Basis Delta Lake in Amazon Athena for Apache Spark, which removes the necessity to set up a separate connector or related dependencies and handle variations, and simplifies the configuration steps required to make use of these frameworks.
On this publish, we present you the right way to use Spark SQL in Amazon Athena notebooks and work with Iceberg, Hudi, and Delta Lake desk codecs. We exhibit widespread operations similar to creating databases and tables, inserting information into the tables, querying information, and snapshots of the tables in Amazon S3 utilizing Spark SQL in Athena.
Conditions
Full the next conditions:
Obtain and import instance notebooks from Amazon S3
To observe alongside, obtain the notebooks mentioned on this publish from the next areas:
After you obtain the notebooks, import them into your Athena Spark atmosphere by following the To import a pocket book part in Managing pocket book recordsdata.
Navigate to particular Open Desk Format part
If you’re excited about Iceberg desk format, navigate to Working with Apache Iceberg tables part.
If you’re excited about Hudi desk format, navigate to Working with Apache Hudi tables part.
If you’re excited about Delta Lake desk format, navigate to Working with Linux basis Delta Lake tables part.
Working with Apache Iceberg tables
When utilizing Spark notebooks in Athena, you may run SQL queries instantly with out having to make use of PySpark. We do that by utilizing cell magics, that are particular headers in a pocket book cell that change the cell’s habits. For SQL, we are able to add the %%sql
magic, which is able to interpret all the cell contents as a SQL assertion to be run on Athena.
On this part, we present how you should utilize SQL on Apache Spark for Athena to create, analyze, and handle Apache Iceberg tables.
Arrange a pocket book session
As a way to use Apache Iceberg in Athena, whereas creating or enhancing a session, choose the Apache Iceberg choice by increasing the Apache Spark properties part. It is going to pre-populate the properties as proven within the following screenshot.
For steps, see Modifying session particulars or Creating your personal pocket book.
The code used on this part is accessible within the SparkSQL_iceberg.ipynb file to observe alongside.
Create a database and Iceberg desk
First, we create a database within the AWS Glue Knowledge Catalog. With the next SQL, we are able to create a database referred to as icebergdb
:
Subsequent, within the database icebergdb
, we create an Iceberg desk referred to as noaa_iceberg
pointing to a location in Amazon S3 the place we are going to load the info. Run the next assertion and substitute the placement s3://<your-S3-bucket>/<prefix>/
along with your S3 bucket and prefix:
Insert information into the desk
To populate the noaa_iceberg
Iceberg desk, we insert information from the Parquet desk sparkblogdb.noaa_pq
that was created as a part of the conditions. You are able to do this utilizing an INSERT INTO assertion in Spark:
Alternatively, you should utilize CREATE TABLE AS SELECT with the USING iceberg clause to create an Iceberg desk and insert information from a supply desk in a single step:
Question the Iceberg desk
Now that the info is inserted within the Iceberg desk, we are able to begin analyzing it. Let’s run a Spark SQL to seek out the minimal recorded temperature by yr for the 'SEATTLE TACOMA AIRPORT, WA US'
location:
We get following output.
Replace information within the Iceberg desk
Let’s have a look at the right way to replace information in our desk. We need to replace the station title 'SEATTLE TACOMA AIRPORT, WA US'
to 'Sea-Tac'
. Utilizing Spark SQL, we are able to run an UPDATE assertion in opposition to the Iceberg desk:
We are able to then run the earlier SELECT question to seek out the minimal recorded temperature for the 'Sea-Tac'
location:
We get the next output.
Compact information recordsdata
Open desk codecs like Iceberg work by creating delta modifications in file storage, and monitoring the variations of rows by way of manifest recordsdata. Extra information recordsdata results in extra metadata saved in manifest recordsdata, and small information recordsdata typically trigger an pointless quantity of metadata, leading to much less environment friendly queries and better Amazon S3 entry prices. Operating Iceberg’s rewrite_data_files
process in Spark for Athena will compact information recordsdata, combining many small delta change recordsdata right into a smaller set of read-optimized Parquet recordsdata. Compacting recordsdata hastens the learn operation when queried. To run compaction on our desk, run the next Spark SQL:
rewrite_data_files gives choices to specify your type technique, which will help reorganize and compact information.
Checklist desk snapshots
Every write, replace, delete, upsert, and compaction operation on an Iceberg desk creates a brand new snapshot of a desk whereas preserving the outdated information and metadata round for snapshot isolation and time journey. To record the snapshots of an Iceberg desk, run the next Spark SQL assertion:
Expire outdated snapshots
Often expiring snapshots is really useful to delete information recordsdata which can be now not wanted, and to maintain the dimensions of desk metadata small. It is going to by no means take away recordsdata which can be nonetheless required by a non-expired snapshot. In Spark for Athena, run the next SQL to run out snapshots for the desk icebergdb.noaa_iceberg
which can be older than a selected timestamp:
Notice that the timestamp worth is specified as a string in format yyyy-MM-dd HH:mm:ss.fff
. The output will give a depend of the variety of information and metadata recordsdata deleted.
Drop the desk and database
You possibly can run the next Spark SQL to scrub up the Iceberg tables and related information in Amazon S3 from this train:
Run the next Spark SQL to take away the database icebergdb:
To study extra about all of the operations you may carry out on Iceberg tables utilizing Spark for Athena, seek advice from Spark Queries and Spark Procedures within the Iceberg documentation.
Working with Apache Hudi tables
Subsequent, we present how you should utilize SQL on Spark for Athena to create, analyze, and handle Apache Hudi tables.
Arrange a pocket book session
As a way to use Apache Hudi in Athena, whereas creating or enhancing a session, choose the Apache Hudi choice by increasing the Apache Spark properties part.
For steps, see Modifying session particulars or Creating your personal pocket book.
The code used on this part ought to be out there within the SparkSQL_hudi.ipynb file to observe alongside.
Create a database and Hudi desk
First, we create a database referred to as hudidb
that will probably be saved within the AWS Glue Knowledge Catalog adopted by Hudi desk creation:
We create a Hudi desk pointing to a location in Amazon S3 the place we are going to load the info. Notice that the desk is of copy-on-write kind. It’s outlined by kind="cow"
within the desk DDL. Now we have outlined station and date because the a number of major keys and preCombinedField as yr. Additionally, the desk is partitioned on yr. Run the next assertion and substitute the placement s3://<your-S3-bucket>/<prefix>/
along with your S3 bucket and prefix:
Insert information into the desk
Like with Iceberg, we use the INSERT INTO assertion to populate the desk by studying information from the sparkblogdb.noaa_pq
desk created within the earlier publish:
Question the Hudi desk
Now that the desk is created, let’s run a question to seek out the utmost recorded temperature for the 'SEATTLE TACOMA AIRPORT, WA US'
location:
Replace information within the Hudi desk
Let’s change the station title 'SEATTLE TACOMA AIRPORT, WA US'
to 'Sea–Tac'
. We are able to run an UPDATE assertion on Spark for Athena to replace the data of the noaa_hudi
desk:
We run the earlier SELECT question to seek out the utmost recorded temperature for the 'Sea-Tac'
location:
Run time journey queries
We are able to use time journey queries in SQL on Athena to research previous information snapshots. For instance:
This question checks the Seattle Airport temperature information as of a selected time previously. The timestamp clause lets us journey again with out altering present information. Notice that the timestamp worth is specified as a string in format yyyy-MM-dd HH:mm:ss.fff
.
Optimize question pace with clustering
To enhance question efficiency, you may carry out clustering on Hudi tables utilizing SQL in Spark for Athena:
Compact tables
Compaction is a desk service employed by Hudi particularly in Merge On Learn (MOR) tables to merge updates from row-based log recordsdata to the corresponding columnar-based base file periodically to provide a brand new model of the bottom file. Compaction is just not relevant to Copy On Write (COW) tables and solely applies to MOR tables. You possibly can run the next question in Spark for Athena to carry out compaction on MOR tables:
Drop the desk and database
Run the next Spark SQL to take away the Hudi desk you created and related information from the Amazon S3 location:
Run the next Spark SQL to take away the database hudidb
:
To find out about all of the operations you may carry out on Hudi tables utilizing Spark for Athena, seek advice from SQL DDL and Procedures within the Hudi documentation.
Working with Linux basis Delta Lake tables
Subsequent, we present how you should utilize SQL on Spark for Athena to create, analyze, and handle Delta Lake tables.
Arrange a pocket book session
As a way to use Delta Lake in Spark for Athena, whereas creating or enhancing a session, choose Linux Basis Delta Lake by increasing the Apache Spark properties part.
For steps, see Modifying session particulars or Creating your personal pocket book.
The code used on this part ought to be out there within the SparkSQL_delta.ipynb file to observe alongside.
Create a database and Delta Lake desk
On this part, we create a database within the AWS Glue Knowledge Catalog. Utilizing following SQL, we are able to create a database referred to as deltalakedb
:
Subsequent, within the database deltalakedb
, we create a Delta Lake desk referred to as noaa_delta
pointing to a location in Amazon S3 the place we are going to load the info. Run the next assertion and substitute the placement s3://<your-S3-bucket>/<prefix>/
along with your S3 bucket and prefix:
Insert information into the desk
We use an INSERT INTO assertion to populate the desk by studying information from the sparkblogdb.noaa_pq
desk created within the earlier publish:
You too can use CREATE TABLE AS SELECT to create a Delta Lake desk and insert information from a supply desk in a single question.
Question the Delta Lake desk
Now that the info is inserted within the Delta Lake desk, we are able to begin analyzing it. Let’s run a Spark SQL to seek out the minimal recorded temperature for the 'SEATTLE TACOMA AIRPORT, WA US'
location:
Replace information within the Delta lake desk
Let’s change the station title 'SEATTLE TACOMA AIRPORT, WA US'
to 'Sea–Tac'
. We are able to run an UPDATE assertion on Spark for Athena to replace the data of the noaa_delta
desk:
We are able to run the earlier SELECT question to seek out the minimal recorded temperature for the 'Sea-Tac'
location, and the outcome ought to be the identical as earlier:
Compact information recordsdata
In Spark for Athena, you may run OPTIMIZE on the Delta Lake desk, which is able to compact the small recordsdata into bigger recordsdata, so the queries should not burdened by the small file overhead. To carry out the compaction operation, run the next question:
Discuss with Optimizations within the Delta Lake documentation for various choices out there whereas working OPTIMIZE.
Take away recordsdata now not referenced by a Delta Lake desk
You possibly can take away recordsdata saved in Amazon S3 which can be now not referenced by a Delta Lake desk and are older than the retention threshold by working the VACCUM command on the desk utilizing Spark for Athena:
Discuss with Take away recordsdata now not referenced by a Delta desk within the Delta Lake documentation for choices out there with VACUUM.
Drop the desk and database
Run the next Spark SQL to take away the Delta Lake desk you created:
Run the next Spark SQL to take away the database deltalakedb
:
Operating DROP TABLE DDL on the Delta Lake desk and database deletes the metadata for these objects, however doesn’t routinely delete the info recordsdata in Amazon S3. You possibly can run the next Python code within the pocket book’s cell to delete the info from the S3 location:
To study extra in regards to the SQL statements you could run on a Delta Lake desk utilizing Spark for Athena, seek advice from the quickstart within the Delta Lake documentation.
Conclusion
This publish demonstrated the right way to use Spark SQL in Athena notebooks to create databases and tables, insert and question information, and carry out widespread operations like updates, compactions, and time journey on Hudi, Delta Lake, and Iceberg tables. Open desk codecs add ACID transactions, upserts, and deletes to information lakes, overcoming limitations of uncooked object storage. By eradicating the necessity to set up separate connectors, Spark on Athena’s built-in integration reduces configuration steps and administration overhead when utilizing these widespread frameworks for constructing dependable information lakes on Amazon S3. To study extra about choosing an open desk format to your information lake workloads, seek advice from Selecting an open desk format to your transactional information lake on AWS.
In regards to the Authors
Pathik Shah is a Sr. Analytics Architect on Amazon Athena. He joined AWS in 2015 and has been focusing within the huge information analytics area since then, serving to prospects construct scalable and strong options utilizing AWS analytics providers.
Raj Devnath is a Product Supervisor at AWS on Amazon Athena. He’s enthusiastic about constructing merchandise prospects love and serving to prospects extract worth from their information. His background is in delivering options for a number of finish markets, similar to finance, retail, good buildings, house automation, and information communication programs.