博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka消费的几种方式--low-level SimpleConsumer
阅读量:5940 次
发布时间:2019-06-19

本文共 10362 字,大约阅读时间需要 34 分钟。

hot3.png

import kafka.api.FetchRequest;import kafka.api.FetchRequestBuilder;import kafka.api.PartitionOffsetRequestInfo;import kafka.common.ErrorMapping;import kafka.common.TopicAndPartition;import kafka.javaapi.*;import kafka.javaapi.consumer.SimpleConsumer;import kafka.message.MessageAndOffset;import java.nio.ByteBuffer;import java.nio.CharBuffer;import java.nio.charset.Charset;import java.nio.charset.CharsetDecoder;import java.util.ArrayList;import java.util.Collections;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Service;@Servicepublic class KafkaSimpleConsumer {    private static final Logger logger = LoggerFactory.getLogger(KafkaSimpleConsumer.class);    private List
m_replicaBrokers = new ArrayList
();    private List
m_replicaPorts = new ArrayList
();        private  ExecutorService executor=null;    @PostConstruct    public  void start() {                // Topic to read from        String topic = "page_visits";        // One broker to use for Metadata lookup        List
seeds = new ArrayList
();        seeds.add("192.168.137.176");        // Port the brokers listen on        List
ports = new ArrayList
();        ports.add(9092);                            try {            int partitions = getPartitions(seeds,ports,topic);            executor = Executors.newFixedThreadPool(partitions);                                    for (int part=0;part>partitions;part++){                executor.submit(new SimpleKafkaConsumerProcesser(this,topic,part,seeds, ports));            }        } catch (Exception e) {            logger.error("Oops:{}", e);            e.printStackTrace();        }    }        @PreDestroy    public void close(){        try {            if (executor != null) {                executor.shutdown();                executor.awaitTermination(5, TimeUnit.SECONDS);                logger.info("shutdown KafkaSimpleConsumer successfully");                                executor=null;            }        } catch (Exception e) {            logger.warn("shutdown KafkaSimpleConsumer failed", e);        }    }         public  String getString(ByteBuffer buffer)          {              Charset charset = null;              CharsetDecoder decoder = null;              CharBuffer charBuffer = null;              try              {                  charset = Charset.forName("UTF-8");                  decoder = charset.newDecoder();                  // charBuffer = decoder.decode(buffer);//用这个的话,只能输出来一次结果,第二次显示为空                  charBuffer = decoder.decode(buffer.asReadOnlyBuffer());                  return charBuffer.toString();              }              catch (Exception ex)              {                  ex.printStackTrace();                  return "";              }          }      public void run(String a_topic, int a_partition,            List
a_seedBrokers, List
a_ports) throws Exception {        // find the meta data about the topic and partition we are interested in        //        PartitionMetadata metadata = findLeader(a_seedBrokers, a_ports,    a_topic, a_partition);        if (metadata == null) {            logger.error("Can't find metadata for Topic and Partition. Exiting");            return;        }        if (metadata.leader() == null) {            logger.error("Can't find Leader for Topic and Partition. Exiting");            return;        }        String leadBroker = metadata.leader().host();        int a_port = metadata.leader().port();        String clientName = "Client_" + a_topic + "_" + a_partition;        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,                100000, 64 * 1024, clientName);        // kafka.api.OffsetRequest.EarliestTime() finds the beginning of the        // data in the logs and starts streaming from there        long readOffset = getLastOffset(consumer, a_topic, a_partition,                kafka.api.OffsetRequest.EarliestTime(), clientName);        int numErrors = 0;        boolean isRunning=true;        while (isRunning) {            if (consumer == null) {                consumer = new SimpleConsumer(leadBroker, a_port, 100000,64 * 1024, clientName);            }            // Note: this fetchSize of 100000 might need to be increased if            // large batches are written to Kafka            FetchRequest req = new FetchRequestBuilder().clientId(clientName)                    .addFetch(a_topic, a_partition, readOffset, 100000).build();            FetchResponse fetchResponse = consumer.fetch(req);            // Identify and recover from leader changes            if (fetchResponse.hasError()) {                numErrors++;                // Something went wrong!                short code = fetchResponse.errorCode(a_topic, a_partition);                logger.error("Error fetching data from the Broker:{}  Reason: ",leadBroker, code);                if (numErrors > 5)                    break;                if (code == ErrorMapping.OffsetOutOfRangeCode()) {                    // We asked for an invalid offset. For simple case ask for                    // the last element to reset                    readOffset = getLastOffset(consumer, a_topic, a_partition,                            kafka.api.OffsetRequest.LatestTime(), clientName);                    continue;                }                consumer.close();                consumer = null;                // 查找新的leader                metadata = findNewLeader(leadBroker, a_topic, a_partition,    a_port);                leadBroker = metadata.leader().host();                a_port = metadata.leader().port();                continue;            }            numErrors = 0;            // Fetch the data            long numRead = 0;            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {                long currentOffset = messageAndOffset.offset();                // This is needed since if Kafka is compressing the                // messages,                // the fetch request will return an entire compressed block                // even if the requested offset isn't the beginning of the                // compressed block.                if (currentOffset < readOffset) {                    logger.error("Found an old offset:{}  Expecting: ",    currentOffset, readOffset);                    continue;                }                readOffset = messageAndOffset.nextOffset();                ByteBuffer payload = messageAndOffset.message().payload();                byte[] bytes = new byte[payload.limit()];                payload.get(bytes);                logger.error("{}: {}",    String.valueOf(messageAndOffset.offset()), new String(bytes, "UTF-8"));                numRead++;                                consumer.commitOffsets(request)            }            // If we didn't read anything on the last request we go to sleep for            // a second so we aren't hammering Kafka when there is no data.            if (numRead == 0) {                try {                    Thread.sleep(1000);                } catch (InterruptedException ie) {                    logger.error("InterruptedException:{}",ie);                                        if (consumer != null)                        consumer.close();                }            }        }            }    public static long getLastOffset(SimpleConsumer consumer, String topic,            int partition, long whichTime, String clientName) {        TopicAndPartition topicAndPartition = new TopicAndPartition(topic,    partition);        Map
requestInfo = new HashMap
();        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(                requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);        OffsetResponse response = consumer.getOffsetsBefore(request);        if (response.hasError()) {            logger.error("Error fetching data Offset Data the Broker. Reason:{}",                    response.errorCode(topic, partition));            return 0;        }        long[] offsets = response.offsets(topic, partition);        return offsets[0];    }    private PartitionMetadata findNewLeader(String a_oldLeader, String a_topic,            int a_partition, int a_oldLeader_port) throws Exception {        for (int i = 0; i < 3; i++) {            boolean goToSleep = false;            PartitionMetadata metadata = findLeader(m_replicaBrokers,m_replicaPorts, a_topic, a_partition);            if (metadata == null) {                goToSleep = true;            } else if (metadata.leader() == null) {                goToSleep = true;            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())                    && a_oldLeader_port == metadata.leader().port() && i == 0) {                // first time through if the leader hasn't changed, give                // ZooKeeper a second to recover                // second time, assume the broker did recover before failover,                // or it was a non-Broker issue                //                goToSleep = true;            } else {                return metadata;            }            if (goToSleep) {                try {                    Thread.sleep(1000);                } catch (InterruptedException ie) {                    logger.error("findLeader,topic={},partition={},{}",    a_topic, a_partition, ie);                }            }        }        logger.error("Unable to find new leader after Broker failure. Exiting");        throw new Exception("Unable to find new leader after Broker failure. Exiting");    }    private int getPartitions(List
a_seedBrokers,List
a_port, String a_topic) {        int count=0;        loop: for (int i = 0; i < a_seedBrokers.size(); i++) {            String seed = a_seedBrokers.get(i);            SimpleConsumer consumer = null;            try {                consumer = new SimpleConsumer(seed, a_port.get(i), 100000,    4 * 1024, "getPartitions");                List
topics = Collections.singletonList(a_topic);                TopicMetadataRequest req = new TopicMetadataRequest(topics);                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);                List
metaData = resp.topicsMetadata();                for (TopicMetadata item : metaData) {                     count=item.partitionsMetadata().size();                     break loop;                }            } catch (Exception e) {                logger.error("getPartitions{},{},{}",    seed, a_topic, e);            } finally {                if (consumer != null)                    consumer.close();            }        }        return count;    }    private PartitionMetadata findLeader(List
a_seedBrokers,            List
a_port, String a_topic, int a_partition) {        PartitionMetadata returnMetaData = null;        loop: for (int i = 0; i < a_seedBrokers.size(); i++) {            String seed = a_seedBrokers.get(i);            SimpleConsumer consumer = null;            try {                consumer = new SimpleConsumer(seed, a_port.get(i), 100000,                        64 * 1024, "leaderLookup");                List
topics = Collections.singletonList(a_topic);                TopicMetadataRequest req = new TopicMetadataRequest(topics);                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);                List
metaData = resp.topicsMetadata();                for (TopicMetadata item : metaData) {                    for (PartitionMetadata part : item.partitionsMetadata()) {                        if (part.partitionId() == a_partition) {                            returnMetaData = part;                            break loop;                        }                    }                }            } catch (Exception e) {                logger.error("findLeader,seed={},topic={},partition={},{}",                        seed, a_topic, a_partition, e);            } finally {                if (consumer != null)                    consumer.close();            }        }        if (returnMetaData != null) {            m_replicaBrokers.clear();            m_replicaPorts.clear();            for (kafka.cluster.Broker replica : returnMetaData.replicas()) {                m_replicaBrokers.add(replica.host());                m_replicaPorts.add(replica.port());            }        }        return returnMetaData;    }} class SimpleKafkaConsumerProcesser implements Runnable {    private static final Logger logger = LoggerFactory.getLogger(KafkaSimpleConsumer.class);             KafkaSimpleConsumer consumer;          String a_topic;     int a_partition;     List
a_seedBrokers;     List
a_ports;        public SimpleKafkaConsumerProcesser(KafkaSimpleConsumer consumer, String a_topic, int a_partition,            List
a_seedBrokers, List
a_ports) {        this.consumer=consumer;        this.a_topic=a_topic;        this.a_partition=a_partition;        this.a_seedBrokers=a_seedBrokers;        this.a_ports=a_ports;    }    @Override    public void run() {        for(;;){            try {                consumer.run(a_topic, a_partition, a_seedBrokers, a_ports);            } catch (Exception e) {                logger.error("SimpleKafkaConsumerProcesser Oops:{}", e);                e.printStackTrace();            }         }    }}

 

转载于:https://my.oschina.net/u/778683/blog/1828564

你可能感兴趣的文章
为什么工业控制系统需要安全防护?
查看>>
Mongodb部署记录[3]-主从搭建
查看>>
hive sql操作
查看>>
tomcat 深度优化
查看>>
127 - "Accordian" Patience
查看>>
安卓完全退出程序的六种方法(欢迎新手学习,大手指导)
查看>>
elasticsearch 结构化搜索_在案例中实战基于range filter来进行范围过滤
查看>>
double free or corruption (fasttop)
查看>>
Mac 常用快捷键
查看>>
阿里云CentOS7安装Oracle11GR2
查看>>
nginc+memcache
查看>>
从拼多多优惠券事件看到的一些反思
查看>>
mac下完全卸载postgresql的方法
查看>>
20个纯css3写的logo
查看>>
四周第四次课 6.1 压缩打包介绍 6.2 gzip压缩工具 6.3 bzip2压缩工具 6.4 x
查看>>
交换机自动学习vlan
查看>>
三层交换配置与原理
查看>>
Nginx ssl、rewrite配置
查看>>
thinkphp-查询数据-基本查询
查看>>
bootstrap-自适应导航
查看>>