import kafka.api.FetchRequest;import kafka.api.FetchRequestBuilder;import kafka.api.PartitionOffsetRequestInfo;import kafka.common.ErrorMapping;import kafka.common.TopicAndPartition;import kafka.javaapi.*;import kafka.javaapi.consumer.SimpleConsumer;import kafka.message.MessageAndOffset;import java.nio.ByteBuffer;import java.nio.CharBuffer;import java.nio.charset.Charset;import java.nio.charset.CharsetDecoder;import java.util.ArrayList;import java.util.Collections;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Service;@Servicepublic class KafkaSimpleConsumer { private static final Logger logger = LoggerFactory.getLogger(KafkaSimpleConsumer.class); private Listm_replicaBrokers = new ArrayList (); private List m_replicaPorts = new ArrayList (); private ExecutorService executor=null; @PostConstruct public void start() { // Topic to read from String topic = "page_visits"; // One broker to use for Metadata lookup List seeds = new ArrayList (); seeds.add("192.168.137.176"); // Port the brokers listen on List ports = new ArrayList (); ports.add(9092); try { int partitions = getPartitions(seeds,ports,topic); executor = Executors.newFixedThreadPool(partitions); for (int part=0;part>partitions;part++){ executor.submit(new SimpleKafkaConsumerProcesser(this,topic,part,seeds, ports)); } } catch (Exception e) { logger.error("Oops:{}", e); e.printStackTrace(); } } @PreDestroy public void close(){ try { if (executor != null) { executor.shutdown(); executor.awaitTermination(5, TimeUnit.SECONDS); logger.info("shutdown KafkaSimpleConsumer successfully"); executor=null; } } catch (Exception e) { logger.warn("shutdown KafkaSimpleConsumer failed", e); } } public String getString(ByteBuffer buffer) { Charset charset = null; CharsetDecoder decoder = null; CharBuffer charBuffer = null; try { charset = Charset.forName("UTF-8"); decoder = charset.newDecoder(); // charBuffer = decoder.decode(buffer);//用这个的话,只能输出来一次结果,第二次显示为空 charBuffer = decoder.decode(buffer.asReadOnlyBuffer()); return charBuffer.toString(); } catch (Exception ex) { ex.printStackTrace(); return ""; } } public void run(String a_topic, int a_partition, List a_seedBrokers, List a_ports) throws Exception { // find the meta data about the topic and partition we are interested in // PartitionMetadata metadata = findLeader(a_seedBrokers, a_ports, a_topic, a_partition); if (metadata == null) { logger.error("Can't find metadata for Topic and Partition. Exiting"); return; } if (metadata.leader() == null) { logger.error("Can't find Leader for Topic and Partition. Exiting"); return; } String leadBroker = metadata.leader().host(); int a_port = metadata.leader().port(); String clientName = "Client_" + a_topic + "_" + a_partition; SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); // kafka.api.OffsetRequest.EarliestTime() finds the beginning of the // data in the logs and starts streaming from there long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName); int numErrors = 0; boolean isRunning=true; while (isRunning) { if (consumer == null) { consumer = new SimpleConsumer(leadBroker, a_port, 100000,64 * 1024, clientName); } // Note: this fetchSize of 100000 might need to be increased if // large batches are written to Kafka FetchRequest req = new FetchRequestBuilder().clientId(clientName) .addFetch(a_topic, a_partition, readOffset, 100000).build(); FetchResponse fetchResponse = consumer.fetch(req); // Identify and recover from leader changes if (fetchResponse.hasError()) { numErrors++; // Something went wrong! short code = fetchResponse.errorCode(a_topic, a_partition); logger.error("Error fetching data from the Broker:{} Reason: ",leadBroker, code); if (numErrors > 5) break; if (code == ErrorMapping.OffsetOutOfRangeCode()) { // We asked for an invalid offset. For simple case ask for // the last element to reset readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName); continue; } consumer.close(); consumer = null; // 查找新的leader metadata = findNewLeader(leadBroker, a_topic, a_partition, a_port); leadBroker = metadata.leader().host(); a_port = metadata.leader().port(); continue; } numErrors = 0; // Fetch the data long numRead = 0; for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) { long currentOffset = messageAndOffset.offset(); // This is needed since if Kafka is compressing the // messages, // the fetch request will return an entire compressed block // even if the requested offset isn't the beginning of the // compressed block. if (currentOffset < readOffset) { logger.error("Found an old offset:{} Expecting: ", currentOffset, readOffset); continue; } readOffset = messageAndOffset.nextOffset(); ByteBuffer payload = messageAndOffset.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); logger.error("{}: {}", String.valueOf(messageAndOffset.offset()), new String(bytes, "UTF-8")); numRead++; consumer.commitOffsets(request) } // If we didn't read anything on the last request we go to sleep for // a second so we aren't hammering Kafka when there is no data. if (numRead == 0) { try { Thread.sleep(1000); } catch (InterruptedException ie) { logger.error("InterruptedException:{}",ie); if (consumer != null) consumer.close(); } } } } public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map requestInfo = new HashMap (); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { logger.error("Error fetching data Offset Data the Broker. Reason:{}", response.errorCode(topic, partition)); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; } private PartitionMetadata findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_oldLeader_port) throws Exception { for (int i = 0; i < 3; i++) { boolean goToSleep = false; PartitionMetadata metadata = findLeader(m_replicaBrokers,m_replicaPorts, a_topic, a_partition); if (metadata == null) { goToSleep = true; } else if (metadata.leader() == null) { goToSleep = true; } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && a_oldLeader_port == metadata.leader().port() && i == 0) { // first time through if the leader hasn't changed, give // ZooKeeper a second to recover // second time, assume the broker did recover before failover, // or it was a non-Broker issue // goToSleep = true; } else { return metadata; } if (goToSleep) { try { Thread.sleep(1000); } catch (InterruptedException ie) { logger.error("findLeader,topic={},partition={},{}", a_topic, a_partition, ie); } } } logger.error("Unable to find new leader after Broker failure. Exiting"); throw new Exception("Unable to find new leader after Broker failure. Exiting"); } private int getPartitions(List a_seedBrokers,List a_port, String a_topic) { int count=0; loop: for (int i = 0; i < a_seedBrokers.size(); i++) { String seed = a_seedBrokers.get(i); SimpleConsumer consumer = null; try { consumer = new SimpleConsumer(seed, a_port.get(i), 100000, 4 * 1024, "getPartitions"); List topics = Collections.singletonList(a_topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); List metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { count=item.partitionsMetadata().size(); break loop; } } catch (Exception e) { logger.error("getPartitions{},{},{}", seed, a_topic, e); } finally { if (consumer != null) consumer.close(); } } return count; } private PartitionMetadata findLeader(List a_seedBrokers, List a_port, String a_topic, int a_partition) { PartitionMetadata returnMetaData = null; loop: for (int i = 0; i < a_seedBrokers.size(); i++) { String seed = a_seedBrokers.get(i); SimpleConsumer consumer = null; try { consumer = new SimpleConsumer(seed, a_port.get(i), 100000, 64 * 1024, "leaderLookup"); List topics = Collections.singletonList(a_topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); List metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) { if (part.partitionId() == a_partition) { returnMetaData = part; break loop; } } } } catch (Exception e) { logger.error("findLeader,seed={},topic={},partition={},{}", seed, a_topic, a_partition, e); } finally { if (consumer != null) consumer.close(); } } if (returnMetaData != null) { m_replicaBrokers.clear(); m_replicaPorts.clear(); for (kafka.cluster.Broker replica : returnMetaData.replicas()) { m_replicaBrokers.add(replica.host()); m_replicaPorts.add(replica.port()); } } return returnMetaData; }} class SimpleKafkaConsumerProcesser implements Runnable { private static final Logger logger = LoggerFactory.getLogger(KafkaSimpleConsumer.class); KafkaSimpleConsumer consumer; String a_topic; int a_partition; List a_seedBrokers; List a_ports; public SimpleKafkaConsumerProcesser(KafkaSimpleConsumer consumer, String a_topic, int a_partition, List a_seedBrokers, List a_ports) { this.consumer=consumer; this.a_topic=a_topic; this.a_partition=a_partition; this.a_seedBrokers=a_seedBrokers; this.a_ports=a_ports; } @Override public void run() { for(;;){ try { consumer.run(a_topic, a_partition, a_seedBrokers, a_ports); } catch (Exception e) { logger.error("SimpleKafkaConsumerProcesser Oops:{}", e); e.printStackTrace(); } } }}