[HTML payload içeriği buraya]
34.6 C
Jakarta
Tuesday, May 12, 2026

Construct Spark Structured Streaming functions with the open supply connector for Amazon Kinesis Information Streams


Apache Spark is a strong large knowledge engine used for large-scale knowledge analytics. Its in-memory computing makes it nice for iterative algorithms and interactive queries. You should utilize Apache Spark to course of streaming knowledge from a wide range of streaming sources, together with Amazon Kinesis Information Streams to be used instances like clickstream evaluation, fraud detection, and extra. Kinesis Information Streams is a serverless streaming knowledge service that makes it simple to seize, course of, and retailer knowledge streams at any scale.

With the brand new open supply Amazon Kinesis Information Streams Connector for Spark Structured Streaming, you should utilize the newer Spark Information Sources API. It additionally helps enhanced fan-out for devoted learn throughput and quicker stream processing. On this put up, we deep dive into the inner particulars of the connector and present you how you can use it to devour and produce data from and to Kinesis Information Streams utilizing Amazon EMR.

Introducing the Kinesis Information Streams connector for Spark Structured Streaming

The Kinesis Information Streams connector for Spark Structured Streaming is an open supply connector that helps each provisioned and On-Demand capability modes supplied by Kinesis Information Streams. The connector is constructed utilizing the most recent Spark Information Sources API V2, which makes use of Spark optimizations. Beginning with Amazon EMR 7.1, the connector comes pre-packaged on Amazon EMR on Amazon EKS, Amazon EMR on Amazon EC2, and Amazon EMR Serverless, so that you don’t have to construct or obtain any packages. For utilizing it with different Apache Spark platforms, the connector is offered as a public JAR file that may be immediately referred to whereas submitting a Spark Structured Streaming job. Moreover, you may obtain and construct the connector from the GitHub repo.

Kinesis Information Streams helps two kinds of shoppers: shared throughput and devoted throughput. With shared throughput, 2 Mbps of learn throughput per shard is shared throughout shoppers. With devoted throughput, also referred to as enhanced fan-out, 2 Mbps of learn throughput per shard is devoted to every client. This new connector helps each client sorts out of the field with none extra coding, offering you the pliability to devour data out of your streams based mostly in your necessities. By default, this connector makes use of a shared throughput client, however you may configure it to make use of enhanced fan-out within the configuration properties.

You can even use the connector as a sink connector to provide data to a Kinesis knowledge stream. The configuration parameters for utilizing the connector as a supply and sink differ—for extra data, see Kinesis Supply Configuration. The connector additionally helps a number of storage choices, together with Amazon DynamoDB, Amazon Easy Service for Storage (Amazon S3), and HDFS, to retailer checkpoints and supply continuity.

For situations the place a Kinesis knowledge stream is deployed in an AWS producer account and the Spark Structured Streaming utility is in a unique AWS client account, you should utilize the connector to do cross-account processing. This requires extra Id and Entry Administration (IAM) belief insurance policies to permit the Spark Structured Streaming utility within the client account to imagine the position within the producer account.

You must also take into account reviewing the safety configuration along with your safety groups based mostly in your knowledge safety necessities.

How the connector works

Consuming data from Kinesis Information Streams utilizing the connector entails a number of steps. The next structure diagram exhibits the inner particulars of how the connector works. A Spark Structured Streaming utility consumes data from a Kinesis knowledge stream supply and produces data to a different Kinesis knowledge stream.

A Kinesis knowledge stream consists of set of shards. A shard is a uniquely recognized sequence of knowledge data in a stream and offers a set unit of capability. The full capability of the stream is the sum of the capability of all of its shards.

A Spark utility consists of a driver and a set of executor processes. The Spark driver acts as a coordinator, and the duties working in executors are accountable for producing and consuming data to and from shards.

The answer workflow consists of the next steps:

  1. Internally, by default, Structured Streaming queries are processed utilizing a micro-batch processing engine, which processes knowledge streams as a sequence of small batch jobs. At first of a micro-batch run, the motive force makes use of the Kinesis Information Streams ListShard API to find out the most recent description of all accessible shards. The connector exposes a parameter (kinesis.describeShardInterval) to configure the interval between two successive ListShard API calls.
  2. The driving force then determines the beginning place in every shard. If the appliance is a brand new job, the beginning place of every shard is decided by kinesis.startingPosition. If it’s a restart of an present job, it’s learn from final file metadata checkpoint from storage (for this put up, DynamoDB) and ignores kinesis.startingPosition.
  3. Every shard is mapped to 1 activity in an executor, which is accountable for studying knowledge. The Spark utility routinely creates an equal variety of duties based mostly on the variety of shards and distributes it throughout the executors.
  4. The duties in an executor use both polling mode (shared) or push mode (enhanced fan-out) to get knowledge data from the beginning place for a shard.
  5. Spark duties working within the executors write the processed knowledge to the information sink. On this structure, we use the Kinesis Information Streams sink as an instance how the connector writes again to the stream. Executors can write to multiple Kinesis Information Streams output shard.
  6. On the finish of every activity, the corresponding executor course of saves the metadata (checkpoint) in regards to the final file learn for every shard within the offset storage (for this put up, DynamoDB). This data is utilized by the motive force within the building of the following micro-batch.

Answer overview

The next diagram exhibits an instance structure of how you can use the connector to learn from one Kinesis knowledge stream and write to a different.

On this structure, we use the Amazon Kinesis Information Generator (KDG) to generate pattern streaming knowledge (random occasions per nation) to a Kinesis Information Streams supply. We begin an interactive Spark Structured Streaming session and devour knowledge from the Kinesis knowledge stream, after which write to a different Kinesis knowledge stream.

We use Spark Structured Streaming to rely occasions per micro-batch window. These occasions for every nation are being consumed from Kinesis Information Streams. After the rely, we will see the outcomes.

Stipulations

To get began, comply with the directions within the GitHub repo. You want the next stipulations:

After you deploy the answer utilizing the AWS CDK, you’ll have the next assets:

  • An EMR cluster with the Kinesis Spark connector put in
  • A Kinesis Information Streams supply
  • A Kinesis Information Streams sink

Create your Spark Structured Streaming utility

After the deployment is full, you may entry the EMR main node to begin a Spark utility and write your Spark Structured Streaming logic.

As we talked about earlier, you utilize the brand new open supply Kinesis Spark connector to devour knowledge from Amazon EMR. You could find the connector code on the GitHub repo together with examples on how you can construct and arrange the connector in Spark.

On this put up, we use Amazon EMR 7.1, the place the connector is natively accessible. If you happen to’re not utilizing Amazon EMR 7.1 and above, you should utilize the connector by working the next code:

cd /usr/lib/spark/jars 
sudo wget https://awslabs-code-us-east-1.s3.amazonaws.com/spark-sql-kinesis-connector/spark-streaming-sql-kinesis-connector_2.12-1.2.1.jar
sudo chmod 755 spark-streaming-sql-kinesis-connector_2.12-1.2.1.jar

Full the next steps:

  1. On the Amazon EMR console, navigate to the emr-spark-kinesis cluster.
  2. On the Cases tab, choose the first occasion and select the Amazon Elastic Compute Cloud (Amazon EC2) occasion ID.

You’re redirected to the Amazon EC2 console.

  1. On the Amazon EC2 console, choose the first occasion and select Join.
  2. Use Session Supervisor, a functionality of AWS Techniques Supervisor, to connect with the occasion.
  3. As a result of the person that’s used to attach is the ssm-user, we have to change to the Hadoop person:

  4. Begin a Spark shell both utilizing Scala or Python to interactively construct a Spark Structured Streaming utility to devour knowledge from a Kinesis knowledge stream.

For this put up, we use Python for writing to a stream utilizing a PySpark shell in Amazon EMR.

  1. Begin the PySpark shell by getting into the command pyspark.

As a result of you have already got the connector put in within the EMR cluster, now you can create the Kinesis supply.

  1. Create the Kinesis supply with the next code:
    kinesis = spark.readStream.format("aws-kinesis") 
        .choice("kinesis.area", "<aws-region>") 
        .choice("kinesis.streamName", "kinesis-source") 
        .choice("kinesis.consumerType", "GetRecords") 
        .choice("kinesis.endpointUrl", "https://kinesis.<aws-region>.amazonaws.com") 
        .choice("kinesis.startingposition", "LATEST") 
        .load()

For creating the Kinesis supply, the next parameters are required:

  • Title of the connector – We use the connector identify aws-kinesis
  • kinesis.area – The AWS Area of the Kinesis knowledge stream you might be consuming
  • kinesis.consumerType – Use GetRecords (commonplace client) or SubscribeToShard (enhanced fan-out client)
  • kinesis.endpointURL – The Regional Kinesis endpoint (for extra particulars, see Service endpoints)
  • kinesis.startingposition – Select LATEST, TRIM_HORIZON, or AT_TIMESTAMP (check with ShardIteratorType)

For utilizing an enhanced fan-out client, extra parameters are wanted, equivalent to the buyer identify. The extra configuration will be discovered within the connector’s GitHub repo.

kinesis_efo = spark 
.readStream 
.format("aws-kinesis") 
.choice("kinesis.area", "<aws-region>") 
.choice("kinesis.streamName", "kinesis-source") 
.choice("kinesis.consumerType", "SubscribeToShard") 
.choice("kinesis.consumerName", "efo-consumer") 
.choice("kinesis.endpointUrl", "https://kinesis.<aws-region>.amazonaws.com") 
.choice("kinesis.startingposition", "LATEST") 
.load()

Deploy the Kinesis Information Generator

Full the next steps to deploy the KDG and begin producing knowledge:

  1. Select Launch Stack:
    launch stack 1

You would possibly want to vary your Area when deploying. Make it possible for the KDG is launched in the identical Area as the place you deployed the answer.

  1. For the parameters Username and Password, enter the values of your alternative. Notice these values to make use of later if you log in to the KDG.
  2. When the template has completed deploying, go to the Outputs tab of the stack and find the KDG URL.
  3. Log in to the KDG, utilizing the credentials you set when launching the CloudFormation template.
  4. Specify your Area and knowledge stream identify, and use the next template to generate take a look at knowledge:
    {
        "id": {{random.quantity(100)}},
        "knowledge": "{{random.arrayElement(
            ["Spain","Portugal","Finland","France"]
        )}}",
        "date": "{{date.now("YYYY-MM-DD hh:mm:ss")}}"
    }

  5. Return to Techniques Supervisor to proceed working with the Spark utility.
  6. To have the ability to apply transformations based mostly on the fields of the occasions, you first have to outline the schema for the occasions:
    from pyspark.sql.sorts import *
    
    pythonSchema = StructType() 
     .add("id", LongType()) 
     .add("knowledge", StringType()) 
     .add("date", TimestampType())

  7. Run the next the command to devour knowledge from Kinesis Information Streams:
    from pyspark.sql.capabilities import *
    
    occasions= kinesis 
      .selectExpr("solid (knowledge as STRING) jsonData") 
      .choose(from_json("jsonData", pythonSchema).alias("occasions")) 
      .choose("occasions.*")

  8. Use the next code for the Kinesis Spark connector sink:
    occasions 
        .selectExpr("CAST(id AS STRING) as partitionKey","knowledge","date") 
        .writeStream 
        .format("aws-kinesis") 
        .choice("kinesis.area", "<aws-region>") 
        .outputMode("append") 
        .choice("kinesis.streamName", "kinesis-sink") 
        .choice("kinesis.endpointUrl", "https://kinesis.<aws-region>.amazonaws.com") 
        .choice("checkpointLocation", "/kinesisCheckpoint") 
        .begin() 
        .awaitTermination()

You’ll be able to view the information within the Kinesis Information Streams console.

  1. On the Kinesis Information Streams console, navigate to kinesis-sink.
  2. On the Information viewer tab, select a shard and a beginning place (for this put up, we use Newest) and select Get data.

You’ll be able to see the information despatched, as proven within the following screenshot. Kinesis Information Streams makes use of base64 encoding by default, so that you would possibly see textual content with unreadable characters.

Clear up

Delete the next CloudFormation stacks created throughout this deployment to delete all of the provisioned assets:

  • EmrSparkKinesisStack
  • Kinesis-Information-Generator-Cognito-Consumer-SparkEFO-Weblog

If you happen to created any extra assets throughout this deployment, delete them manually.

Conclusion

On this put up, we mentioned the open supply Kinesis Information Streams connector for Spark Structured Streaming. It helps the newer Information Sources API V2 and Spark Structured Streaming for constructing streaming functions. The connector additionally permits high-throughput consumption from Kinesis Information Streams with enhanced fan-out by offering devoted throughput as much as 2 Mbps per shard per client. With this connector, now you can effortlessly construct high-throughput streaming functions with Spark Structured Streaming.

The Kinesis Spark connector is open supply below the Apache 2.0 license on GitHub. To get began, go to the GitHub repo.


In regards to the Authors


Idan Maizlits is a Senior Product Supervisor on the Amazon Kinesis Information Streams group at Amazon Internet Companies. Idan loves participating with prospects to study their challenges with real-time knowledge and to assist them obtain their enterprise targets. Exterior of labor, he enjoys spending time together with his household exploring the outside and cooking.


Subham Rakshit is a Streaming Specialist Options Architect for Analytics at AWS based mostly within the UK. He works with prospects to design and construct search and streaming knowledge platforms that assist them obtain their enterprise goal. Exterior of labor, he enjoys spending time fixing jigsaw puzzles together with his daughter.

Francisco Morillo is a Streaming Options Architect at AWS. Francisco works with AWS prospects serving to them design real-time analytics architectures utilizing AWS providers, supporting Amazon MSK and AWS’s managed providing for Apache Flink.

Umesh Chaudhari is a Streaming Options Architect at AWS. He works with prospects to design and construct real-time knowledge processing methods. He has intensive working expertise in software program engineering, together with architecting, designing, and creating knowledge analytics methods. Exterior of labor, he enjoys touring, studying, and watching films.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles