Kafka入门:Java客户端库的使用

news/2024/11/17 2:17:53 标签: kafka, java, 分布式

在现代的分布式系统中,消息队列扮演着至关重要的角色,而Apache Kafka以其高吞吐量、可扩展性和容错性而广受欢迎。本文将带你了解如何使用Kafka的Java客户端库来实现生产者(Producer)和消费者(Consumer)的基本操作。

Kafka简介

Apache Kafka是一个分布式流处理平台,主要用于构建实时数据管道和流式应用程序。它具有高吞吐量、可扩展性和容错性,适用于处理实时数据。

环境准备

在开始之前,请确保你已经安装了以下环境:

  1. Java开发环境(JDK)
  2. Kafka服务器(可以是本地安装或远程服务器)
  3. Kafka客户端库(通常通过Maven或Gradle引入)

Maven依赖

如果你使用Maven来管理项目依赖,可以在pom.xml文件中添加以下依赖:

<dependencies>
    <!-- Kafka客户端依赖 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.2.0</version>
    </dependency>
</dependencies>

Kafka生产者(Producer)

生产者负责将消息发送到Kafka的特定主题(Topic)。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        // 配置属性
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建生产者
        Producer<String, String> producer = new KafkaProducer<>(props);

        // 创建消息
        ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");

        // 发送消息
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                exception.printStackTrace();
            } else {
                System.out.println("Message sent successfully");
            }
        });

        // 关闭生产者
        producer.close();
    }
}

Kafka消费者(Consumer)

消费者负责从Kafka的特定主题(Topic)接收消息。

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        // 配置属性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");

        // 创建消费者
        Consumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        consumer.subscribe(Collections.singletonList("test-topic"));

        // 消费消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}

运行截图

生产者:

消费者:

总结

通过上述示例,我们了解了如何使用Kafka的Java客户端库来创建生产者和消费者。这些基本操作是构建基于Kafka的分布式应用的基石。Kafka的强大功能远不止于此,包括但不限于消息持久化、分区、复制等高级特性,这些都需要在实际项目中根据具体需求进行深入学习和应用。

希望这篇文章能帮助你快速入门Kafka的Java客户端库使用。如果你有任何问题或需要进一步的指导,请随时留言讨论。


http://www.niftyadmin.cn/n/5754807.html

相关文章

C++常用的特性-->day05

友元的拓展语法 声明一个类为另外一个类的友元时&#xff0c;不再需要使用class关键字&#xff0c;并且还可以使用类的别名&#xff08;使用 typedef 或者 using 定义&#xff09;。 #include <iostream> using namespace std;// 类声明 class Tom; // 定义别名 using …

生成式GPT商品推荐:精准满足用户需求

生成式GPT商品推荐&#xff1a;精准满足用户需求 随着人工智能&#xff08;AI&#xff09;技术的飞速发展&#xff0c;电商平台正在逐步迎来一场前所未有的变革。尤其是生成式GPT&#xff08;Generative Pre-trained Transformer&#xff09;技术的应用&#xff0c;正在重新定…

坚持燃油新能源双赛道发力,MG ES5MG7 2025款亮相广州车展

11月15日&#xff0c;第22届广州车展正式启幕&#xff0c;上汽MG名爵携“B级车王炸”MG7 2025款、全球高标准纯电后驱SUV MG ES5、“A级轿跑天花板”新一代MG5等明星阵容重磅登场&#xff0c;并通过一系列技术创新成果及精彩活动展示&#xff0c;尽显百年品牌独特魅力与强大实力…

tensorflow案例6--基于VGG16的猫狗识别(准确率99.8%+),以及tqdm、train_on_batch的简介

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 前言 本次还是学习API和如何搭建神经网络为主&#xff0c;这一次用VGG16去对猫狗分类&#xff0c;效果还是很好的&#xff0c;达到了99.8% 文章目录 1、tqdm…

kubesphere环境-本地Harbor仓库+k8s集群(单master 多master)+Prometheus监控平台部署

前言&#xff1a;半月前在公司生产环境上离线部署了k8s集群Victoria Metrics(二开版)自研版夜莺 监控平台的搭建&#xff0c;下面我租用3台华为云服务器演示部署kubesphere环境-本地Harbor仓库k8s集群&#xff08;单master节点 & 单master节点&#xff09;Prometheus监控部…

html + css 自适应首页布局案例

文章目录 前言一、组成二、代码1. css 样式2. body 内容3.全部整体 三、效果 前言 一个自适应的html布局 一、组成 整体居中&#xff0c;宽度1200px&#xff0c;小屏幕宽度100% 二、代码 1. css 样式 代码如下&#xff08;示例&#xff09;&#xff1a; <style>* {…

【IEEE出版 | 中国石油大学(华东)主办】第六届信息与计算机前沿术国际学术会议(ICFTIC 2024,12月13-15日)

第六届信息与计算机前沿术国际学术会议(ICFTIC 2024) 2024 6th International Conference on Frontier Technologies of Information and Computer 官方信息 会议官网&#xff1a;WWW.ICFTIC.ORG 2024 6th International Conference on Frontier Technologies of Information…

网络编程套接字2

之前我们已经介绍了UDP套接字流程&#xff0c;接下来我们介绍TCP流套接字编程&#xff0c;TCP的一个核心特点&#xff0c;面向字节流&#xff0c;读写数据的基本单位就是字节。 1.API介绍 1.1ServerSocket:是创建TCP服务器Socket的API&#xff08;专门给服务器用&#xff09;…