Saturday, December 21, 2024
HomeBig DataAgain up and restore Kafka matter information utilizing Amazon MSK Join

Again up and restore Kafka matter information utilizing Amazon MSK Join

[ad_1]

You should utilize Apache Kafka to run your streaming workloads. Kafka offers resiliency to failures and protects your information out of the field by replicating information throughout the brokers of the cluster. This makes certain that the info within the cluster is sturdy. You may obtain your sturdiness SLAs by altering the replication issue of the subject. Nevertheless, streaming information saved in Kafka subjects tends to be transient and sometimes has a retention time of days or even weeks. You might need to again up the info saved in your Kafka matter lengthy after its retention time expires for a number of causes. For instance, you may need compliance necessities that require you to retailer the info for a number of years. Or you’ll have curated artificial information that must be repeatedly hydrated into Kafka subjects earlier than beginning your workload’s integration checks. Or an upstream system that you just don’t have management over produces unhealthy information and you’ll want to restore your matter to a beforehand effectively state.

Storing information indefinitely in Kafka subjects is an choice, however generally the use case requires a separate copy. Instruments resembling MirrorMaker allow you to again up your information into one other Kafka cluster. Nevertheless, this requires one other lively Kafka cluster to be operating as a backup, which will increase compute prices and storage prices. A cheap and sturdy manner of backing up the info of your Kafka cluster is to make use of an object storage service like Amazon Easy Storage Service (Amazon S3).

On this publish, we stroll via an answer that permits you to again up your information for chilly storage utilizing Amazon MSK Join. We restore the backed-up information to a different Kafka matter and reset the buyer offsets primarily based in your use case.

Overview of answer

Kafka Join is a part of Apache Kafka that simplifies streaming information between Kafka subjects and exterior methods like object shops, databases, and file methods. It makes use of sink connectors to stream information from Kafka subjects to exterior methods, and supply connectors to stream information from exterior methods to Kafka subjects. You should utilize off-the-shelf connectors written by third events or write your individual connectors to fulfill your particular necessities.

MSK Join is a characteristic of Amazon Managed Streaming for Apache Kafka (Amazon MSK) that permits you to run absolutely managed Kafka Join workloads. It really works with MSK clusters and with suitable self-managed Kafka clusters. On this publish, we use the Lenses AWS S3 Connector to again up the info saved in a subject in an Amazon MSK cluster to Amazon S3 and restore this information again to a different matter. The next diagram exhibits our answer structure.

To implement this answer, we full the next steps:

  1. Again up the info utilizing an MSK Join sink connector to an S3 bucket.
  2. Restore the info utilizing an MSK Join supply connector to a brand new Kafka matter.
  3. Reset client offsets primarily based on completely different eventualities.

Conditions

Make sure that to finish the next steps as conditions:

  1. Arrange the required sources for Amazon MSK, Amazon S3, and AWS Identification and Entry Administration (IAM).
  2. Create two Kafka subjects within the MSK cluster: source_topic and target_topic.
  3. Create an MSK Join plugin utilizing the Lenses AWS S3 Connector.
  4. Set up the Kafka CLI by following Step 1 of Apache Kafka Quickstart.
  5. Set up the kcat utility to ship take a look at messages to the Kafka matter.

Again up your subjects

Relying on the use case, you could need to again up all of the subjects in your Kafka cluster or again up some particular subjects. On this publish, we cowl the right way to again up a single matter, however you possibly can lengthen the answer to again up a number of subjects.

The format wherein the info is saved in Amazon S3 is essential. You might need to examine the info that’s saved in Amazon S3 to debug points just like the introduction of unhealthy information. You may study information saved as JSON or plain textual content by utilizing textual content editors and searching within the time frames which might be of curiosity to you. You may also study giant quantities of knowledge saved in Amazon S3 as JSON or Parquet utilizing AWS companies like Amazon Athena. The Lenses AWS S3 Connector helps storing objects as JSON, Avro, Parquet, plaintext, or binary.

On this publish, we ship JSON information to the Kafka matter and retailer it in Amazon S3. Relying on the info kind that meets your necessities, replace the join.s3.kcql assertion and *.converter configuration. You may check with the Lenses sink connector documentation for particulars of the codecs supported and the associated configurations. If the prevailing connectors don’t work to your use case, you too can write your individual connector or lengthen present connectors. You may partition the info saved in Amazon S3 primarily based on fields of primitive varieties within the message header or payload. We use the date fields saved within the header to partition the info on Amazon S3.

Observe these steps to again up your matter:

  1. Create a brand new Amazon MSK sink connector by operating the next command:
    aws kafkaconnect create-connector 
    --capacity "autoScaling={maxWorkerCount=2,mcuCount=1,minWorkerCount=1,scaleInPolicy={cpuUtilizationPercentage=10},scaleOutPolicy={cpuUtilizationPercentage=80}}" 
    --connector-configuration 
    "connector.class=io.lenses.streamreactor.join.aws.s3.sink.S3SinkConnector, 
    key.converter.schemas.allow=false, 
    join.s3.kcql=INSERT INTO <<S3 Bucket Title>>:my_workload SELECT * FROM source_topic PARTITIONBY _header.yr,_header.month,_header.day,_header.hour STOREAS `JSON` WITHPARTITIONER=KeysAndValues WITH_FLUSH_COUNT = 5, 
    aws.area=us-east-1, 
    duties.max=2, 
    subjects=source_topic, 
    schema.allow=false, 
    errors.log.allow=true, 
    worth.converter=org.apache.kafka.join.storage.StringConverter, 
    key.converter=org.apache.kafka.join.storage.StringConverter " 
    --connector-name "backup-msk-to-s3-v1" 
    --kafka-cluster '{"apacheKafkaCluster": {"bootstrapServers": "<<MSK dealer checklist>>","vpc": {"securityGroups": [ <<Security Group>> ],"subnets": [ <<Subnet List>> ]}}}' 
    --kafka-cluster-client-authentication "authenticationType=NONE" 
    --kafka-cluster-encryption-in-transit "encryptionType=PLAINTEXT" 
    --kafka-connect-version "2.7.1" 
    --plugins "customPlugin={customPluginArn=<< ARN of the MSK Join Plugin >>,revision=1}" 
    --service-execution-role-arn " <<ARN of the IAM Function>> "

  2. Ship information to the subject utilizing kcat:
    ./kcat -b <<dealer checklist>> -t source_topic -H "yr=$(date +"%Y")" -H "month=$(date +"%m")" -H "day=$(date +"%d")" -H "hour=$(date +"%H")" -P
    {"message":"interesset eros vel elit salutatus"}
    {"message":"impetus deterruisset per aliquam luctus"}
    {"message":"ridens vocibus feugait vitae cras"}
    {"message":"interesset eros vel elit salutatus"}
    {"message":"impetus deterruisset per aliquam luctus"}
    {"message":"ridens vocibus feugait vitae cras"}

  3. Test the S3 bucket to ensure the info is being written.

MSK Join publishes metrics to Amazon CloudWatch that you should use to observe your backup course of. Necessary metrics are SinkRecordReadRate and SinkRecordSendRate, which measure the typical variety of data learn from Kafka and written to Amazon S3, respectively.

Additionally, ensure that the backup connector is maintaining with the speed at which the Kafka matter is receiving messages by monitoring the offset lag of the connector. When you’re utilizing Amazon MSK, you are able to do this by turning on partition-level metrics on Amazon MSK and monitoring the OffsetLag metric of all of the partitions for the backup connector’s client group. You must maintain this as near 0 as doable by adjusting the utmost variety of MSK Join employee situations. The command that we used within the earlier step units MSK Connect with robotically scale as much as two staff. Alter the --capacity setting to extend or lower the utmost employee rely of MSK Join staff primarily based on the OffsetLag metric.

Restore information to your subjects

You may restore your backed-up information to a brand new matter with the identical identify in the identical Kafka cluster, a distinct matter in the identical Kafka cluster, or a distinct matter in a distinct Kafka cluster altogether. On this publish, we stroll via the state of affairs of restoring information that was backed up in Amazon S3 to a distinct matter, target_topic, in the identical Kafka cluster. You may lengthen this to different eventualities by altering the subject and dealer particulars within the connector configuration.

Observe these steps to revive the info:

  1. Create an Amazon MSK supply connector by operating the next command:
    aws kafkaconnect create-connector 
    --capacity "autoScaling={maxWorkerCount=2,mcuCount=1,minWorkerCount=1,scaleInPolicy={cpuUtilizationPercentage=10},scaleOutPolicy={cpuUtilizationPercentage=80}}"   
    --connector-configuration 
        "connector.class=io.lenses.streamreactor.join.aws.s3.supply.S3SourceConnector, 
         key.converter.schemas.allow=false, 
         join.s3.kcql=INSERT INTO target_topic SELECT * FROM <<S3 Bucket Title>>:my_workload PARTITIONBY _header.yr,_header.month,_header.day,_header.hour STOREAS `JSON` WITHPARTITIONER=KeysAndValues WITH_FLUSH_COUNT = 5 , 
         aws.area=us-east-1, 
         duties.max=2, 
         subjects=target_topic, 
         schema.allow=false, 
         errors.log.allow=true, 
         worth.converter=org.apache.kafka.join.storage.StringConverter, 
         key.converter=org.apache.kafka.join.storage.StringConverter " 
    --connector-name "restore-s3-to-msk-v1" 
    --kafka-cluster '{"apacheKafkaCluster": {"bootstrapServers": "<<MSK dealer checklist>>","vpc": {"securityGroups": [<<Security Group>>],"subnets": [ <<Subnet List>> ]}}}' 
    --kafka-cluster-client-authentication "authenticationType=NONE" 
    --kafka-cluster-encryption-in-transit "encryptionType=PLAINTEXT" 
    --kafka-connect-version "2.7.1" 
    --plugins "customPlugin={customPluginArn=<< ARN of the MSK Join Plugin >>,revision=1}" 
    --service-execution-role-arn " <<ARN of the IAM Function>> "

The connector reads the info from the S3 bucket and replays it again to target_topic.

  1. Confirm if the info is being written to the Kafka matter by operating the next command:
    ./kafka-console-consumer.sh --bootstrap-server <<MSK dealer checklist>> --topic target_topic --from-beginning

MSK Join connectors run indefinitely, ready for brand new information to be written to the supply. Nevertheless, whereas restoring, you need to cease the connector in any case the info is copied to the subject. MSK Join publishes the SourceRecordPollRate and SourceRecordWriteRate metrics to CloudWatch, which measure the typical variety of data polled from Amazon S3 and variety of data written to the Kafka cluster, respectively. You may monitor these metrics to trace the standing of the restore course of. When these metrics attain 0, the info from Amazon S3 is restored to the target_topic. You will get notified of the completion by establishing a CloudWatch alarm on these metrics. You may lengthen the automation to invoke an AWS Lambda perform that deletes the connector when the restore is full.

As with the backup course of, you possibly can pace up the restore course of by scaling out the variety of MSK Join staff. Change the --capacity parameter to regulate the utmost and minimal staff to a quantity that meets the restore SLAs of your workload.

Reset client offsets

Relying on the necessities of restoring the info to a brand new Kafka matter, you might also must reset the offsets of the client group earlier than consuming or producing to them. Figuring out the precise offset that you just need to reset to will depend on your particular enterprise use case and includes guide work to determine this. You should utilize instruments like Amazon S3 Choose, Athena, or different customized instruments to examine the objects. The next screenshot demonstrates studying the data ending at offset 14 of partition 2 of matter source_topic utilizing S3 Choose.

After you determine the brand new begin offsets to your client teams, you need to reset them in your Kafka cluster. You are able to do this utilizing the CLI instruments that come bundled with Kafka.

Current client teams

If you wish to use the identical client group identify after restoring the subject, you are able to do this by operating the next command for every partition of the restored matter:

 ./kafka-consumer-groups.sh --bootstrap-server <<dealer checklist>> --group <<client group>> --topic target_topic:<<partition>> --to-offset <<desired offset>> --reset-offsets --execute

Confirm this by operating the --describe choice of the command:

./kafka-consumer-groups.sh --bootstrap-server <<dealer checklist>> --group <<client group>>  --describe
TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        ...
source_topic  0          211006          188417765       188206759  ...
source_topic  1          212847          192997707       192784860  ...
source_topic  2          211147          196410627       196199480  ...
target_topic  0          211006          188417765       188206759  ...
target_topic  1          212847          192997707       192784860  ...
target_topic  2          211147          196410627       196199480  ...

New client group

In order for you your workload to create a brand new client group and search to customized offsets, you are able to do this by invoking the search technique in your Kafka client for every partition. Alternatively, you possibly can create the brand new client group by operating the next code:

./kafka-console-consumer.sh --bootstrap-server <<dealer checklist>> --topic target_topic --group <<client group>> --from-beginning --max-messages 1

Reset the offset to the specified offsets for every partition by operating the next command:

./kafka-consumer-groups.sh --bootstrap-server <<dealer checklist>> --group <<New client group>> --topic target_topic:<<partition>> --to-offset <<desired offset>> --reset-offsets --execute

Clear up

To keep away from incurring ongoing fees, full the next cleanup steps:

  1. Delete the MSK Join connectors and plugin.
  2. Delete the MSK cluster.
  3. Delete the S3 buckets.
  4. Delete any CloudWatch sources you created.

Conclusion

On this publish, we confirmed you the right way to again up and restore Kafka matter information utilizing MSK Join. You may lengthen this answer to a number of subjects and different information codecs primarily based in your workload. Remember to take a look at numerous eventualities that your workloads could face and doc the runbook for every of these eventualities.

For extra info, see the next sources:


Concerning the Creator

Rakshith Rao is a Senior Options Architect at AWS. He works with AWS’s strategic prospects to construct and function their key workloads on AWS.

[ad_2]

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments