日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

udp怎么保证不丢包_在 Flink 算子中使用多线程如何保证不丢数据?

發(fā)布時(shí)間:2024/7/23 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 udp怎么保证不丢包_在 Flink 算子中使用多线程如何保证不丢数据? 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

分析痛點(diǎn)

筆者線上有一個(gè) Flink 任務(wù)消費(fèi) Kafka 數(shù)據(jù),將數(shù)據(jù)轉(zhuǎn)換后,在 Flink 的 Sink 算子內(nèi)部調(diào)用第三方 api 將數(shù)據(jù)上報(bào)到第三方的數(shù)據(jù)分析平臺。這里使用批量同步 api,即:每 50 條數(shù)據(jù)請求一次第三方接口,可以通過批量 api 來提高請求效率。由于調(diào)用的外網(wǎng)接口,所以每次調(diào)用 api 比較耗時(shí)。假如批次大小為 50,且請求接口的平均響應(yīng)時(shí)間為 50ms,使用同步 api,因此第一次請求響應(yīng)以后才會發(fā)起第二次請求。請求示意圖如下所示:

平均下來,每 50 ms 向第三方服務(wù)器發(fā)送 50 條數(shù)據(jù),也就是每個(gè)并行度 1 秒鐘處理 1000 條數(shù)據(jù)。假設(shè)當(dāng)前業(yè)務(wù)數(shù)據(jù)量為每秒 10 萬條數(shù)據(jù),那么 Flink Sink 算子的并行度需要設(shè)置為 100 才能正常處理線上數(shù)據(jù)。從 Flink 資源分配來講,100 個(gè)并行度需要申請 100 顆 CPU,因此當(dāng)前 Flink 任務(wù)需要占用集群中 100 顆 CPU 以及不少的內(nèi)存資源。請問此時(shí) Flink Sink 算子的 CPU 或者內(nèi)存壓力大嗎?

上述請求示意圖可以看出 Flink 任務(wù)發(fā)出請求到響應(yīng)這 50ms 期間,Flink Sink 算子只是在 wait,并沒有實(shí)質(zhì)性的工作。因此,CPU 使用率肯定很低,當(dāng)前任務(wù)的瓶頸明顯在網(wǎng)絡(luò) IO。最后結(jié)論是 Flink 任務(wù)申請了 100 顆 CPU,導(dǎo)致 yarn 或其他資源調(diào)度框架沒有資源了,但是這 100 顆 CPU 的使用率并不高,這里能不能優(yōu)化通過提高 CPU 的使用率,從而少申請一些 CPU 呢?

同步批量請求優(yōu)化為異步請求

首先可以想到的是將同步請求改為異步請求,使得任務(wù)不會阻塞在網(wǎng)絡(luò)請求這一環(huán)節(jié),請求示意圖如下所示。

異步請求相比同步請求而言,優(yōu)化點(diǎn)在于每次發(fā)出請求時(shí),不需要等待請求響應(yīng)后再發(fā)送下一次請求,而是當(dāng)下一批次的 50 條數(shù)據(jù)準(zhǔn)備好之后,直接向第三方服務(wù)器發(fā)送請求。每次發(fā)送請求后,Flink Sink 算子的客戶端需要注冊監(jiān)聽器來等待響應(yīng),當(dāng)響應(yīng)失敗時(shí)需要做重試或者回滾策略。

通過異步請求的方式,可以優(yōu)化網(wǎng)絡(luò)瓶頸,假如 Flink Sink 算子的單個(gè)并行度平均 10ms 接收到 50 條數(shù)據(jù),那么使用異步 api 的方式平均 1 秒可以處理 5000 條數(shù)據(jù),整個(gè) Flink 任務(wù)的性能提高了 5 倍。對于每秒 10 萬數(shù)據(jù)量的業(yè)務(wù),這里僅需要申請 20 顆 CPU 資源即可。關(guān)于異步 api 的具體使用,可以根據(jù)場景具體設(shè)計(jì),這里不詳細(xì)討論。

多線程 Client 模式

對于一些不支持異步 api 的場景,可能并不能使用上述優(yōu)化方案,同樣,為了提高 CPU 使用率,可以在 Flink Sink 端使用多線程的方案。如下圖所示,可以在 Flink Sink 端開啟 5 個(gè)請求第三方服務(wù)器的 Client 線程:Client1、Client2、Client3、Client4、Client5。

這五個(gè)線程內(nèi)分別使用同步批量請求的 Client,單個(gè) Client 還是保持 50 條記錄為一個(gè)批次,即 50 條記錄請求一次第三方 api。請求第三方 api 耗時(shí)主要在于網(wǎng)絡(luò) IO(性能瓶頸在于網(wǎng)絡(luò)請求延遲),因此如果變成 5 個(gè) Client 線程,每個(gè) Client 的單次請求平均耗時(shí)還能保持在 50ms,除非網(wǎng)絡(luò)請求已經(jīng)達(dá)到了帶寬上限或整個(gè)任務(wù)又遇到其他瓶頸。所以,多線程模式下使用同步批量 api 也能將請求效率提升 5 倍。

說明:多線程的方案,不僅限于請求第三方接口,對于非 CPU 密集型的任務(wù)也可以使用該方案,在降低 CPU 數(shù)量的同時(shí),單個(gè) CPU 承擔(dān)多個(gè)線程的工作,從而提高 CPU 利用率。例如:請求 HBase 的任務(wù)或磁盤 IO 是瓶頸的任務(wù),可以降低任務(wù)的并行度,使得每個(gè)并行度內(nèi)處理多個(gè)線程。

Flink 算子內(nèi)多線程實(shí)現(xiàn)

Sink 算子的單個(gè)并行度內(nèi)現(xiàn)在有 5 個(gè) Client 用于消費(fèi)數(shù)據(jù),但 Sink 算子的數(shù)據(jù)都來自于上游算子。如下圖所示,一個(gè)簡單的實(shí)現(xiàn)方式是 Sink 算子接收到上游數(shù)據(jù)后通過輪循或隨機(jī)的策略將數(shù)據(jù)分發(fā)給 5 個(gè) Client 線程。

但是輪循或者隨機(jī)策略會存在問題,假如 5 個(gè) Client 中 Client3 線程消費(fèi)較慢,會導(dǎo)致給 Client3 分發(fā)數(shù)據(jù)時(shí)被阻塞,從而使得其他正常消費(fèi)的線程 Client1、2、4、5 也被分發(fā)不到數(shù)據(jù)。

為了解決上述問題,可以在 Sink 算子內(nèi)申請一個(gè)數(shù)據(jù)緩沖隊(duì)列,隊(duì)列有先進(jìn)先出(FIFO)的特性。Sink 算子接收到的數(shù)據(jù)直接插入到隊(duì)列尾部,五個(gè) Client 線程不斷地從隊(duì)首取數(shù)據(jù)并消費(fèi),即:Sink 算子先接收的數(shù)據(jù) Client 先消費(fèi),后接收的數(shù)據(jù) Client 后消費(fèi)。

  • 若隊(duì)列一直是滿的,說明 Client 線程消費(fèi)較慢、Sink 算子上游生產(chǎn)數(shù)據(jù)較快。
  • 若隊(duì)列一直為空,說明 Client 線程消費(fèi)較快、Sink 算子的上游生產(chǎn)數(shù)據(jù)較慢。

五個(gè)線程共用同一個(gè)隊(duì)列完美地解決了單個(gè)線程消費(fèi)慢的問題,當(dāng) Client3 線程阻塞時(shí),不影響其他線程從隊(duì)列中消費(fèi)數(shù)據(jù)。這里使用隊(duì)列還起到了削峰填谷的作用。

代碼實(shí)現(xiàn)

原理明白了,具體代碼如下所示,首先是消費(fèi)數(shù)據(jù)的 Client 線程代碼,代碼邏輯很簡單,一直從 bufferQueue 中 poll 數(shù)據(jù),取出數(shù)據(jù)后,執(zhí)行相應(yīng)的消費(fèi)邏輯即可,在本案例中消費(fèi)邏輯便是 Client 積攢批次并調(diào)用第三方 api。

public class MultiThreadConsumerClient implements Runnable {private LinkedBlockingQueue<String> bufferQueue;public MultiThreadConsumerClient(LinkedBlockingQueue<String> bufferQueue) {this.bufferQueue = bufferQueue;}@Overridepublic void run() {String entity;while (true){// 從 bufferQueue 的隊(duì)首消費(fèi)數(shù)據(jù)entity = bufferQueue.poll();// 執(zhí)行 client 消費(fèi)數(shù)據(jù)的邏輯doSomething(entity);}}// client 消費(fèi)數(shù)據(jù)的邏輯private void doSomething(String entity) {// client 積攢批次并調(diào)用第三方 api} }

Sink 算子代碼如下所示,在 open 方法中需要初始化線程池、數(shù)據(jù)緩沖隊(duì)列并創(chuàng)建開啟消費(fèi)者線程,在 invoke 方法中只需要往 bufferQueue 的隊(duì)尾添加數(shù)據(jù)即可。

public class MultiThreadConsumerSink extends RichSinkFunction<String> {// Client 線程的默認(rèn)數(shù)量private final int DEFAULT_CLIENT_THREAD_NUM = 5;// 數(shù)據(jù)緩沖隊(duì)列的默認(rèn)容量private final int DEFAULT_QUEUE_CAPACITY = 5000;private LinkedBlockingQueue<String> bufferQueue;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// new 一個(gè)容量為 DEFAULT_CLIENT_THREAD_NUM 的線程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(DEFAULT_CLIENT_THREAD_NUM, DEFAULT_CLIENT_THREAD_NUM,0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());// new 一個(gè)容量為 DEFAULT_QUEUE_CAPACITY 的數(shù)據(jù)緩沖隊(duì)列this.bufferQueue = Queues.newLinkedBlockingQueue(DEFAULT_QUEUE_CAPACITY);// 創(chuàng)建并開啟消費(fèi)者線程MultiThreadConsumerClient consumerClient = new MultiThreadConsumerClient(bufferQueue);for (int i=0; i < DEFAULT_CLIENT_THREAD_NUM; i++) {threadPoolExecutor.execute(consumerClient);}}@Overridepublic void invoke(String value, Context context) throws Exception {// 往 bufferQueue 的隊(duì)尾添加數(shù)據(jù)bufferQueue.put(value);} }

代碼邏輯相對比較簡單,請問上述 Sink 能保證 Exactly Once 嗎?

答:不能保證 Exactly Once,Flink 要想端對端保證 Exactly Once,必須要求外部組件支持事務(wù),這里第三方接口明顯不支持事務(wù)。

那么上述 Sink 能保證 At Lease Once 嗎?言外之意,上述 Sink 會丟數(shù)據(jù)嗎?

答:會丟數(shù)據(jù)。因?yàn)樯鲜霭咐惺褂玫呐?api 來消費(fèi)數(shù)據(jù),假如批量 api 是每積攢 50 條數(shù)據(jù)請求一次第三方接口,當(dāng)做 Checkpoint 時(shí)可能只積攢了 30 條數(shù)據(jù),所以做 Checkpoint 時(shí)內(nèi)存中可能還有數(shù)據(jù)未發(fā)送到外部系統(tǒng)。而且數(shù)據(jù)緩沖隊(duì)列中可能還有緩存的數(shù)據(jù),因此上述 Sink 在做 Checkpoint 時(shí)會出現(xiàn) Checkpoint 之前的數(shù)據(jù)未完全消費(fèi)的情況。

例如,Flink 任務(wù)消費(fèi)的 Kafka 數(shù)據(jù),當(dāng)做 Checkpoint 時(shí),Flink 任務(wù)消費(fèi)到 offset 為 10000 的位置,但實(shí)際上 offset 10000 之前的一小部分?jǐn)?shù)據(jù)可能還在數(shù)據(jù)緩沖隊(duì)列中尚未完全消費(fèi),或者因?yàn)闆]積攢夠一定批次所以數(shù)據(jù)緩存在 client 中,并未請求到第三方。當(dāng)任務(wù)失敗后,Flink 任務(wù)從 Checkpoint 處恢復(fù),會從 offset 為 10000 的位置開始消費(fèi),此時(shí) offset 10000 之前的一小部分緩存在內(nèi)存緩沖隊(duì)列中的數(shù)據(jù)不會再被消費(fèi),于是就出現(xiàn)了丟數(shù)據(jù)情況。

處理丟數(shù)據(jù)情況

如何保證數(shù)據(jù)不丟失呢?很簡單,可以在 Checkpoint 時(shí)強(qiáng)制將數(shù)據(jù)緩沖區(qū)的數(shù)據(jù)全部消費(fèi)完,并對 client 執(zhí)行 flush 操作,保證 client 端不會緩存數(shù)據(jù)。

實(shí)現(xiàn)思路:Sink 算子可以實(shí)現(xiàn) CheckpointedFunction 接口,當(dāng)做 Checkpoint 時(shí),會調(diào)用 snapshotState 方法,方法內(nèi)可以觸發(fā) client 的 flush 操作。但 client 在 MultiThreadConsumerClient 對應(yīng)的五個(gè)線程中,需要考慮線程同步的問題,即:Sink 算子的 snapshotState 方法中做一個(gè)操作,要使得五個(gè) Client 線程感知到當(dāng)前正在執(zhí)行 Checkpoint,此時(shí)應(yīng)該把數(shù)據(jù)緩沖區(qū)的數(shù)據(jù)全部消費(fèi)完,并對 client 執(zhí)行過 flush 操作。

如何實(shí)現(xiàn)呢?需要借助 CyclicBarrier。CyclicBarrier 會讓所有線程都等待某個(gè)操作完成后才會繼續(xù)下一步行動。在這里可以使用 CyclicBarrier,讓 Checkpoint 等待所有的 client 將數(shù)據(jù)緩沖區(qū)的數(shù)據(jù)全部消費(fèi)完并對 client 執(zhí)行過 flush 操作,言外之意,offset 10000 之前的數(shù)據(jù)必須全部消費(fèi)完成才允許 Checkpoint 執(zhí)行完成。這樣就可以保證 Checkpoint 時(shí)不會有數(shù)據(jù)被緩存在內(nèi)存,可以保證數(shù)據(jù)源 offset 10000 之前的數(shù)據(jù)都消費(fèi)完成。

MultiThreadConsumerSink 具體代碼如下所示:

public class MultiThreadConsumerSink extends RichSinkFunction<String> {// Client 線程的默認(rèn)數(shù)量private final int DEFAULT_CLIENT_THREAD_NUM = 5;// 數(shù)據(jù)緩沖隊(duì)列的默認(rèn)容量private final int DEFAULT_QUEUE_CAPACITY = 5000;private LinkedBlockingQueue<String> bufferQueue;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// new 一個(gè)容量為 DEFAULT_CLIENT_THREAD_NUM 的線程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(DEFAULT_CLIENT_THREAD_NUM, DEFAULT_CLIENT_THREAD_NUM,0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());// new 一個(gè)容量為 DEFAULT_QUEUE_CAPACITY 的數(shù)據(jù)緩沖隊(duì)列this.bufferQueue = Queues.newLinkedBlockingQueue(DEFAULT_QUEUE_CAPACITY);// 創(chuàng)建并開啟消費(fèi)者線程MultiThreadConsumerClient consumerClient = new MultiThreadConsumerClient(bufferQueue);for (int i=0; i < DEFAULT_CLIENT_THREAD_NUM; i++) {threadPoolExecutor.execute(consumerClient);}}@Overridepublic void invoke(String value, Context context) throws Exception {// 往 bufferQueue 的隊(duì)尾添加數(shù)據(jù)bufferQueue.put(value);} }

MultiThreadConsumerSink 實(shí)現(xiàn)了 CheckpointedFunction 接口,在 open 方法中增加了 CyclicBarrier 的初始化,CyclicBarrier 預(yù)期容量設(shè)置為 client 線程數(shù)加一,表示當(dāng) client 線程數(shù)加一個(gè)線程都執(zhí)行了 await 操作時(shí),所有的線程的 await 方法才會執(zhí)行完成。這里為什么要加一呢?因?yàn)槌?client 線程外, snapshotState 方法中也需要執(zhí)行過 await。

當(dāng)做 Checkpoint 時(shí) snapshotState 方法中執(zhí)行 clientBarrier.await(),等待所有的 client 線程將緩沖區(qū)數(shù)據(jù)消費(fèi)完。snapshotState 方法執(zhí)行過程中 invoke 方法不會被執(zhí)行,即:Checkpoint 過程中數(shù)據(jù)緩沖隊(duì)列不會增加數(shù)據(jù),所以 client 線程很快就可以將緩沖隊(duì)列中的數(shù)據(jù)消費(fèi)完。

MultiThreadConsumerClient 具體代碼如下所示:

public class MultiThreadConsumerSink extends RichSinkFunction<String> implements CheckpointedFunction {private Logger LOG = LoggerFactory.getLogger(MultiThreadConsumerSink.class);// Client 線程的默認(rèn)數(shù)量private final int DEFAULT_CLIENT_THREAD_NUM = 5;// 數(shù)據(jù)緩沖隊(duì)列的默認(rèn)容量private final int DEFAULT_QUEUE_CAPACITY = 5000;private LinkedBlockingQueue<String> bufferQueue;private CyclicBarrier clientBarrier;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// new 一個(gè)容量為 DEFAULT_CLIENT_THREAD_NUM 的線程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(DEFAULT_CLIENT_THREAD_NUM, DEFAULT_CLIENT_THREAD_NUM,0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());// new 一個(gè)容量為 DEFAULT_QUEUE_CAPACITY 的數(shù)據(jù)緩沖隊(duì)列this.bufferQueue = Queues.newLinkedBlockingQueue(DEFAULT_QUEUE_CAPACITY);// barrier 需要攔截 (DEFAULT_CLIENT_THREAD_NUM + 1) 個(gè)線程this.clientBarrier = new CyclicBarrier(DEFAULT_CLIENT_THREAD_NUM + 1);// 創(chuàng)建并開啟消費(fèi)者線程MultiThreadConsumerClient consumerClient = new MultiThreadConsumerClient(bufferQueue, clientBarrier);for (int i=0; i < DEFAULT_CLIENT_THREAD_NUM; i++) {threadPoolExecutor.execute(consumerClient);}}@Overridepublic void invoke(String value, Context context) throws Exception {// 往 bufferQueue 的隊(duì)尾添加數(shù)據(jù)bufferQueue.put(value);}@Overridepublic void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {LOG.info("snapshotState : 所有的 client 準(zhǔn)備 flush !!!");// barrier 開始等待clientBarrier.await();}@Overridepublic void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {}}

從數(shù)據(jù)緩沖隊(duì)列中 poll 數(shù)據(jù)時(shí),增加了 timeout 時(shí)間為 50ms。如果從隊(duì)列中拿到數(shù)據(jù),則執(zhí)行消費(fèi)數(shù)據(jù)的邏輯,若拿不到數(shù)據(jù)說明數(shù)據(jù)緩沖隊(duì)列中數(shù)據(jù)消費(fèi)完了。此時(shí)需要判斷是否有等待的 CyclicBarrier,如果有等待的 CyclicBarrier 說明此時(shí)正在執(zhí)行 Checkpoint,所以 client 需要執(zhí)行 flush 操作。flush 完成后,Client 線程執(zhí)行 barrier.await() 操作。當(dāng)所有的 Client 線程都執(zhí)行到 await 時(shí),所有的 barrier.await() 都會被執(zhí)行完。此時(shí) Sink 算子的 snapshotState 方法就會執(zhí)行完。通過這種策略可以保證 Checkpoint 時(shí)將數(shù)據(jù)緩沖區(qū)中的數(shù)據(jù)消費(fèi)完,client 執(zhí)行 flush 操作可以保證 client 端不會緩存數(shù)據(jù)。

總結(jié)

分析到這里,我們設(shè)計(jì)的 Sink 終于可以保證不丟失數(shù)據(jù)了。對 CyclicBarrier 不了解的同學(xué)請 Google 或百度查詢。再次強(qiáng)調(diào)這里多線程的方案,不僅限于請求第三方接口,對于非 CPU 密集型的任務(wù)都可以使用該方案來提高 CPU 利用率,且該方案不僅限于 Sink 算子,各種算子都適用。本文主要希望幫助大家理解 Flink 中使用多線程的優(yōu)化及在 Flink 算子中使用多線程如何保證不丟數(shù)據(jù)。

原文鏈接

本文為阿里云內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載

總結(jié)

以上是生活随笔為你收集整理的udp怎么保证不丢包_在 Flink 算子中使用多线程如何保证不丢数据?的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 自由成熟xxxx色视频 | japanesexxxx日本妞 | 成人久久电影 | 亚洲一区二区免费电影 | 天堂在线视频 | 成年人看片网站 | 少妇性生活视频 | 无码人妻一区二区三区在线 | 草av| av夜夜操 | 色综合天天综合 | 精品人妻一区二区三区在线视频 | 色哟哟在线观看视频 | 美日韩一区二区三区 | 亚洲中文一区二区三区 | 免费国产在线观看 | 国产电影一区二区三区爱妃记 | 亚洲va视频| 99久久婷婷国产综合精品电影 | 午夜色大片 | 亚洲aaaaaaa| 色美av| 欧美美女视频 | 美女户外露出 | 高清无码视频直接看 | 九热在线视频 | 欧美日韩一卡 | 欧美激情 一区 | 肉大榛一进一出免费视频 | 国产成人三级一区二区在线观看一 | 俺啪也 | 亚洲人成在线免费观看 | 青草草在线 | 一区二区传媒有限公司 | 好男人在线视频www 亚洲福利国产 | 久久精精品久久久久噜噜 | 在线香蕉视频 | 极品销魂美女一区二区 | 日本一级淫片色费放 | av在线浏览| 成人深夜在线观看 | 亚洲福利天堂 | 欧美日韩国产图片 | 偷拍亚洲视频 | 日本欧美国产一区二区三区 | 91丝袜一区在线观看 | www.久久久久久久久久 | 欲求不满在线小早川怜子 | 午夜精产品一区二区在线观看的 | 99免费 | 日本裸体动漫 | free性护士vidos猛交 | 欧美精品久久久久 | 亚洲午夜精品一区二区三区 | 草女人视频 | 色无极亚洲影院 | 久久久99精品国产一区二区三区 | 国产在线a| 久久久久www | 色婷婷综合五月 | 亚洲一区二区免费在线观看 | 无码精品一区二区三区在线播放 | 国产精品高潮呻吟久久久 | 黄色欧美网站 | 18视频在线观看网站 | 成人超碰| 亚洲免费在线观看av | 肉丝美足丝袜一区二区三区四 | 伊人精品视频 | 日日骚一区 | 欧美四区| 性欧美69| 91视频精品| 久久成人精品一区二区 | 激情午夜天 | 一区二区三区不卡视频在线观看 | 爱情岛亚洲首页论坛小巨 | 国产老头老太作爱视频 | 特色特色大片在线 | 免费福利在线视频 | 亚洲欧洲日产av | 日韩不卡av在线 | 免费在线观看黄色网址 | 两性午夜免费视频 | 性色欲网站人妻丰满中文久久不卡 | 五月天视频 | 最新av电影网站 | 青草青青视频 | 国产sm调教视频 | kk视频在线观看 | 胖女人做爰全过程 | www.青青草| 久久伊人国产 | 欧美性猛交xxxx乱大交俱乐部 | 日本美女视频一区 | 久久黄色 | 有码在线播放 | 夏晴子在线 | 中文字幕一区二区在线观看 |