下面是Java将CSV的数据发送到kafka的示例的详细攻略:
准备工作
首先,在本地安装kafka和创建一个名为test
的topic。同时,在项目中引入以下依赖库:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>5.2</version>
</dependency>
</dependencies>
示例1:使用单个线程发送数据
下面是示例代码,其使用单个线程从CSV文件中读取数据并将其发送到kafka中。注意,这里使用的是ProducerRecord
,而非Producer
类,因为后者不支持单个线程发送消息。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.opencsv.CSVReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
public class CsvToKafkaSingleThread {
public static void main(String[] args) {
// 配置kafka生产者
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 读取csv文件并将数据发送到kafka
try (CSVReader csvReader = new CSVReader(
new InputStreamReader(new FileInputStream("example.csv"), StandardCharsets.UTF_8)
)) {
String[] nextLine;
while ((nextLine = csvReader.readNext()) != null) {
String key = nextLine[0];
String value = String.join(",", nextLine);
ProducerRecord<String, String> record = new ProducerRecord<>("test", key, value);
producer.send(record);
Thread.sleep(10); // 防止发送速度过快
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
这个示例中使用了opencsv
库来读取CSV文件,同时使用了Thread.sleep
方法来控制发送速度,防止过快的发送导致kafka出现负载过高的情况。当然,在实际生产环境中,发送速度还需要通过代码调优来实现最佳状态。
示例2:使用线程池发送数据
示例代码如下,这里使用线程池,采用多线程方式发送数据,从而提高发送速度。
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.opencsv.CSVReader;
public class CsvToKafkaMultiThread {
public static void main(String[] args) throws Exception {
// 配置kafka生产者
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 读取csv文件并将数据发送到kafka
int numThreads = 10;
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
try (CSVReader csvReader = new CSVReader(
new InputStreamReader(new FileInputStream("example.csv"), StandardCharsets.UTF_8)
)) {
String[] nextLine;
while ((nextLine = csvReader.readNext()) != null) {
String key = nextLine[0];
String value = String.join(",", nextLine);
ProducerRecord<String, String> record = new ProducerRecord<>("test", key, value);
executor.submit(() -> {
producer.send(record);
});
}
} catch (IOException e) {
e.printStackTrace();
} finally {
executor.shutdown();
producer.close();
executor.awaitTermination(10, TimeUnit.MINUTES);
}
}
}
这个示例中使用了Java中的线程池技术,通过ExecutorService
和Executors
来创建线程池和线程实例,对CSV数据进行分批发送。在实际生产环境中,我们可以根据实际情况调整线程池的大小和文件读取的缓冲区大小等参数,以达到最佳的发送性能。同时,在发送大量数据时,我们还可以使用kafka提供的批量发送API,进一步提高性能。
希望上述两个示例能够对您理解Java将CSV的数据发送到kafka的过程和方法有所帮助。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Java将CSV的数据发送到kafka的示例 - Python技术站