• 
    

      <address id="upfr9"><pre id="upfr9"><strike id="upfr9"></strike></pre></address>
      1. <address id="upfr9"><tr id="upfr9"></tr></address><dl id="upfr9"></dl>

        kafka客戶端的使用方法

        這篇文章主要介紹“kafka客戶端的使用方法”,在日常操作中,相信很多人在kafka客戶端的使用方法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”kafka客戶端的使用方法”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

        站在用戶的角度思考問題,與客戶深入溝通,找到坡頭網(wǎng)站設計與坡頭網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗,讓設計與互聯(lián)網(wǎng)技術結合,創(chuàng)造個性化、用戶體驗好的作品,建站類型包括:做網(wǎng)站、成都網(wǎng)站制作、企業(yè)官網(wǎng)、英文網(wǎng)站、手機端網(wǎng)站、網(wǎng)站推廣、域名注冊、網(wǎng)絡空間、企業(yè)郵箱。業(yè)務覆蓋坡頭地區(qū)。

        kafka客戶端發(fā)布record(消息)到kafka集群。

        新的生產(chǎn)者是線程安全的,在線程之間共享單個生產(chǎn)者實例,通常單例比多個實例要快。

        一個簡單的例子,使用producer發(fā)送一個有序的key/value(鍵值對),放到java的main方法里就能直接運行,

        Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         props.put("acks", "all");
         props.put("retries", 0);
         props.put("batch.size", 16384);
         props.put("linger.ms", 1);
         props.put("buffer.memory", 33554432);
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
         Producer<String, String> producer = new KafkaProducer<>(props);
         for(int i = 0; i < 100; i++)
             producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
        
         producer.close();

        生產(chǎn)者的緩沖空間池保留尚未發(fā)送到服務器的消息,后臺I/O線程負責將這些消息轉換成請求發(fā)送到集群。如果使用后不關閉生產(chǎn)者,則會泄露這些資源。

        send()方法是異步的,添加消息到緩沖區(qū)等待發(fā)送,并立即返回。生產(chǎn)者將單個的消息批量在一起發(fā)送來提高效率。

        ack是判別請求是否為完整的條件(就是是判斷是不是成功發(fā)送了)。我們指定了“all”將會阻塞消息,這種設置性能最低,但是是最可靠的。

        retries,如果請求失敗,生產(chǎn)者會自動重試,我們指定是0次,如果啟用重試,則會有重復消息的可能性。

        producer(生產(chǎn)者)緩存每個分區(qū)未發(fā)送的消息。緩存的大小是通過 batch.size 配置指定的。值較大的話將會產(chǎn)生更大的批。并需要更多的內(nèi)存(因為每個“活躍”的分區(qū)都有1個緩沖區(qū))。

        默認緩沖可立即發(fā)送,即便緩沖空間還沒有滿,但是,如果你想減少請求的數(shù)量,可以設置linger.ms大于0。這將指示生產(chǎn)者發(fā)送請求之前等待一段時間,希望更多的消息填補到未滿的批中。這類似于TCP的算法,例如上面的代碼段,可能100條消息在一個請求發(fā)送,因為我們設置了linger(逗留)時間為1毫秒,然后,如果我們沒有填滿緩沖區(qū),這個設置將增加1毫秒的延遲請求以等待更多的消息。需要注意的是,在高負載下,相近的時間一般也會組成批,即使是 linger.ms=0。在不處于高負載的情況下,如果設置比0大,以少量的延遲代價換取更少的,更有效的請求。

        buffer.memory 控制生產(chǎn)者可用的緩存總量,如果消息發(fā)送速度比其傳輸?shù)椒掌鞯目欤瑢谋M這個緩存空間。當緩存空間耗盡,其他發(fā)送調(diào)用將被阻塞,阻塞時間的閾值通過max.block.ms設定,之后它將拋出一個TimeoutException。

        key.serializervalue.serializer示例,將用戶提供的key和value對象ProducerRecord轉換成字節(jié),你可以使用附帶的ByteArraySerializaerStringSerializer處理簡單的string或byte類型。

        send()

        public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)

        異步發(fā)送一條消息到topic,并調(diào)用callback(當發(fā)送已確認)。

        send是異步的,并且一旦消息被保存在等待發(fā)送的消息緩存中,此方法就立即返回。這樣并行發(fā)送多條消息而不阻塞去等待每一條消息的響應。

        發(fā)送的結果是一個RecordMetadata,它指定了消息發(fā)送的分區(qū),分配的offset和消息的時間戳。如果topic使用的是CreateTime,則使用用戶提供的時間戳或發(fā)送的時間(如果用戶沒有指定指定消息的時間戳)如果topic使用的是LogAppendTime,則追加消息時,時間戳是broker的本地時間。

        由于send調(diào)用是異步的,它將為分配消息的此消息的RecordMetadata返回一個Future。如果future調(diào)用get(),則將阻塞,直到相關請求完成并返回該消息的metadata,或拋出發(fā)送異常。

        如果要模擬一個簡單的阻塞調(diào)用,你可以調(diào)用get()方法。

         byte[] key = "key".getBytes();
         byte[] value = "value".getBytes();
         ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
         producer.send(record).get();

        完全無阻塞的話,可以利用回調(diào)參數(shù)提供的請求完成時將調(diào)用的回調(diào)通知。

         ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
         producer.send(myRecord,
                       new Callback() {
                           public void onCompletion(RecordMetadata metadata, Exception e) {
                               if(e != null)
                                   e.printStackTrace();
                               System.out.println("The offset of the record we just sent is: " + metadata.offset());
                           }
                       });

        發(fā)送到同一個分區(qū)的消息回調(diào)保證按一定的順序執(zhí)行,也就是說,在下面的例子中 callback1 保證執(zhí)行 callback2 之前:

        producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
        producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);

        注意:callback一般在生產(chǎn)者的I/O線程中執(zhí)行,所以是相當?shù)目斓模駝t將延遲其他的線程的消息發(fā)送。如果你需要執(zhí)行阻塞或計算昂貴(消耗)的回調(diào),建議在callback主體中使用自己的Executor來并行處理。

        pecified by:

        send in interface Producer<K,V>

        Parameters:

        record - 發(fā)送的記錄(消息)
        callback - 用戶提供的callback,服務器來調(diào)用這個callback來應答結果(null表示沒有callback)。

        Throws:

        InterruptException - 如果線程在阻塞中斷。
        SerializationException - 如果key或value不是給定有效配置的serializers。
        TimeoutException - 如果獲取元數(shù)據(jù)或消息分配內(nèi)存話費的時間超過max.block.ms。
        KafkaException - Kafka有關的錯誤(不屬于公共API的異常)。

        到此,關于“kafka客戶端的使用方法”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

        當前名稱:kafka客戶端的使用方法
        網(wǎng)頁路徑:http://www.jbt999.com/article26/gseojg.html

        成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站設計建站公司、網(wǎng)站排名網(wǎng)站維護、關鍵詞優(yōu)化

        廣告

        聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:[email protected]。內(nèi)容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)

        商城網(wǎng)站建設

      2. 
        

          <address id="upfr9"><pre id="upfr9"><strike id="upfr9"></strike></pre></address>
          1. <address id="upfr9"><tr id="upfr9"></tr></address><dl id="upfr9"></dl>
            中文字幕第5页 | 女人香蕉网站。 | 在线 中文字幕 日韩 | 丁香五月乱伦 | 日韩欧美三级电影在线观看 | 五月777 | 噜噜吧噜噜久久综合 | 欧美一级特黄A片免费 | 一本无码一区二区三区 | 国产精品第二页 |