日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

for循环数据量太大_中文文本分类roberta大力出奇迹之数据量大的问题

發(fā)布時(shí)間:2023/12/10 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 for循环数据量太大_中文文本分类roberta大力出奇迹之数据量大的问题 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

問(wèn)題描述: 筆者在文本分類(lèi)場(chǎng)景中使用了roberta+pool+dense的三分類(lèi)模型。采用預(yù)訓(xùn)練模型做項(xiàng)目的時(shí)候經(jīng)??嘤跀?shù)據(jù)太少,模型泛化性差,因此收集了1300W數(shù)據(jù)。在我嘗試暴力出奇跡的時(shí)候,遇到了部分問(wèn)題,在此記錄一下。

一. 數(shù)據(jù)預(yù)處理時(shí)間長(zhǎng)

盡管數(shù)據(jù)預(yù)處理步驟比較簡(jiǎn)單,一般也就清洗、分詞、token2id等操作(我一般把token2id放在預(yù)處理階段做),但是由于數(shù)據(jù)量比較大時(shí),也很耗時(shí)間。

1.分詞提速

jieba分詞開(kāi)啟并行很簡(jiǎn)單,一行代碼開(kāi)啟

jieba.enable_parallel(100)

但是這里注意,jieba并行的過(guò)程是一個(gè) jieba.cut()這一動(dòng)作中的,如果輸入的是多個(gè)句子,則jieba會(huì)并行處理多個(gè)句子。

# 錯(cuò)誤jieba.cut('我愛(ài)北京天安門(mén)。')jieba.cut('北京是中國(guó)首都。')#正確示范jieba.cut('我愛(ài)北京天安門(mén)。\n北京是中國(guó)首都。')

因此如果需要采用并行,需要先包裝多行數(shù)據(jù),再拆分出來(lái)。具體參考

temp_lines = texts[:100]_res = ' '.join((jieba.cut('\n'.join(temp_lines))))split_words = [_.lstrip(' ').rstrip(' ') for _ in _res.split('\n')]res.extend(split_words)
2. 函數(shù)并行

當(dāng)然,使用預(yù)訓(xùn)練模型不需要對(duì)中文進(jìn)行分詞,因此此處耗時(shí)主要在數(shù)據(jù)清洗,正則處理等操作上,所以方法1已經(jīng)不起作用啦。過(guò)程就是先讀取文件到內(nèi)存,然后分多個(gè)進(jìn)程預(yù)處理,最終獲取所有進(jìn)程處理結(jié)果,寫(xiě)入文件。

import multiprocessingimport mathdef func(id, texts): print('enter %d' % (id)) res = [] for i, text in enumerate(texts): if i % 10000 == 0 and i != 0: print(i, '/', id) value=tokenizer.encode(first=text, max_len=seq_len)[0] res.append(value) print('leave %d' % (id)) return id, res def quick_func(texts, func): pool = multiprocessing.Pool(processes=CPUS) results = [] for i in range(CPUS): imin = i*math.ceil(len(texts)/CPUS) imax = min((i+1)*math.ceil(len(texts)/CPUS), len(texts)) results.append(pool.apply_async(func, (i, texts[imin:imax],))) pool.close() pool.join() print("Sub-process(es) done.") res = [] for _ in results: res.append(_.get()) res = sorted(res, key=lambda x: x[0]) texts = [] for _ in res: texts.extend(_[1]) return texts

注意! func內(nèi)部出錯(cuò),子進(jìn)程直接結(jié)束,不raise錯(cuò)誤的。如果要調(diào)試,最好還是加上traceback查看問(wèn)題

在func中完全可以使用jieba的cut,不需要開(kāi)啟jieba并行。在處理數(shù)據(jù)500萬(wàn)左右的時(shí)候,各個(gè)進(jìn)程已經(jīng)結(jié)束,但是該函數(shù)遲遲不返回值,直到我htop查看內(nèi)存,發(fā)現(xiàn)內(nèi)存快速增長(zhǎng)到150G之后不在增長(zhǎng),程序也卡住啦,發(fā)現(xiàn)可能是 raw句子和處理后的結(jié)果占用內(nèi)存過(guò)高,因此這個(gè)方法也不行了。

3. 文件并行

查看資料的時(shí)候發(fā)現(xiàn),有老哥處理幾億行文件使用linux bash來(lái)預(yù)處理,速度明顯提升,我也想嘗試,但是最終由于功力不足,失敗。折中辦法,先將文件拆分成多個(gè)文件,python并行處理多個(gè)文件并輸出結(jié)果到多個(gè)文件,然后合并多個(gè)文件到單個(gè)文件。(注意,這里會(huì)丟失各行順序,如果去重需要在外部處理,僅僅是讀取寫(xiě)入文件單線程也還是挺快的)

并行讀取同一個(gè)文件或者并行寫(xiě)入同一個(gè)文件是危險(xiǎn)的,可能會(huì)寫(xiě)入或者讀取混亂(錯(cuò)誤)

import multiprocessingCPUS = 40IN_DATA_SPLIT_PREFIX = 'data-split-tmp-in'OUT_DATA_SPLIT_PREFIX = 'data-split-tmp-out'seq_len = 256 # 文本最大長(zhǎng)度vocab_path = '../../basedata/chinese_wwm_ext_L-12_H-768_A-12/vocab.txt'tokenizer = LimitBertTokenizer(vocab_path) # text2 *4 < text1char_tool = CharTools()def _clean_tokenize(infile, outfile, filters=[]): fin = open(infile, 'r') fout = open(outfile, 'w') for i, line in enumerate(fin): if i % 10000 == 0 and i != 0: print(i, ' / ', infile) items = line.strip().split('\t') ###########------------######### if len(items) != 6: continue object_id, _, operator_id, title, content, action = items type = 'model' if operator_id == '0' else 'human' if type in filters: continue if action not in action2label.keys(): continue label = action2label[action] title = char_tool.clean(title) content = char_tool.clean(content) title = title[:seq_len] content = content[:seq_len] wordids, segmentids = tokenizer.encode( first=content, second=title, max_len=seq_len) fout.write(json.dumps( {'type': type, 'label': label, 'wordids': wordids, 'segmentids': segmentids})+'\n') ###########------------######### fin.close() fout.close()def parallel(_func, infile, outfile, filters=[]): os.system('split -n l/%d %s %s' % (CPUS, infile, IN_DATA_SPLIT_PREFIX)) print("split files done") pool = multiprocessing.Pool(processes=CPUS) for small_data_file_in in [_ for _ in os.listdir('.') if _.startswith(IN_DATA_SPLIT_PREFIX)]: small_data_file_out = small_data_file_in.replace( IN_DATA_SPLIT_PREFIX, OUT_DATA_SPLIT_PREFIX) pool.apply_async(_func, args=( small_data_file_in, small_data_file_out, filters,)) pool.close() pool.join() print("Sub-process(es) done.") os.system('cat %s* > %s' % (OUT_DATA_SPLIT_PREFIX, outfile)) os.system('rm %s*' % (IN_DATA_SPLIT_PREFIX)) os.system('rm %s*' % (OUT_DATA_SPLIT_PREFIX)) print("done.")

二. numpy加載后占用內(nèi)存太大

之前由于機(jī)器內(nèi)存夠用+數(shù)據(jù)量不算太大,在訓(xùn)練過(guò)程中我都是加載前文處理的json文件為numpy數(shù)據(jù)然后使用model.fit()進(jìn)行訓(xùn)練的,代碼如下

def load_from_json(filename): labels = [] wordids = [] segmentids = [] with open(filename, 'r') as f: for i, line in enumerate(f): if i % 100000 == 0 and i != 0: print('載入數(shù)據(jù):%d ' % i) item = json.loads(line.strip()) labels.append(item['label']) wordids.append(item['wordids']) wordids = np.array(wordids) segmentids = np.zeros((len(labels), seq_len), int) labels = tf.keras.utils.to_categorical(labels) [train_wordids, val_wordids, train_segmentids, val_segmentids, train_label3s, val_label3s] = train_test_split(wordids, segmentids, label3s, test_size=0.01, stratify=labels,random_state=0) return [[train_wordids, train_segmentids], [train_label3s], [val_wordids, val_segmentids], [val_label3s]]train_X,train_y,val_X,val_y=load_from_json()model.fit(train_X, train_Y, validation_data=(val_X, val_Y),)

直到,我遇到了千萬(wàn)數(shù)據(jù)集,首先讀取后占用機(jī)器超過(guò)150G內(nèi)存,另外python提示單個(gè)變量占用超過(guò)10%內(nèi)存,就此程序卡住,因此不得不更換方法。

  • tf.data: 官方推薦的方法,但是我感覺(jué)使用json或者re都不是很方便,加上tf.function寫(xiě)起來(lái)不是很方便,放棄。
  • data.generator:一般generator很常見(jiàn),但是很多人使用的時(shí)候都是把數(shù)據(jù)完全讀進(jìn)內(nèi)存,然后在generator中實(shí)現(xiàn)shuffle和輸出batch的功能(就沒(méi)有g(shù)enerator的作用啦),這里由于數(shù)據(jù)量太大,明顯是不能讀取所有數(shù)據(jù)進(jìn)內(nèi)存的。為了保持shuffle的功能,這里還是順序讀取文件,但是維持一個(gè)buffer, 在buffer中對(duì)數(shù)據(jù)進(jìn)行shuffle。
class DataGenerator(): # 讀取 generator def __init__(self, json_file, batch_size=2, min_buffer_size=200000, max_buffer_size=300000, shuffle=True): self.f = open(json_file, 'r') self.batch_size = batch_size file_len = int(os.popen('wc -l %s' % json_file).read().split()[0]) self.len = math.ceil(file_len / batch_size) self.buffer_lines = [] self.max_buffer_size = max_buffer_size self.min_buffer_size = min_buffer_size self.shuffle = shuffle self.check_load() def __len__(self): return self.len def _read_line(self): """獲取一行數(shù)據(jù)""" line = self.f.readline() if not line: self.f.seek(0) line = self.f.readline() return line def check_load(self): """保證buffer中的數(shù)據(jù)量滿足要求""" if len(self.buffer_lines) > self.min_buffer_size: return else: while len(self.buffer_lines) <= self.max_buffer_size: self.buffer_lines.append(self._read_line()) if self.shuffle: random.shuffle(self.buffer_lines) def _handle(self, lines): pass def __iter__(self): while True: self.check_load() lines, self.buffer_lines = self.buffer_lines[:self.batch_size], self.buffer_lines[self.batch_size:] yield self._handle(lines)class MyDataGenerator(DataGenerator): def _handle(self, lines): word_ids, segment_ids, labels = [], [], [] for line in lines: item = json.loads(line.strip()) labels.append(item['label']) word_ids.append(item['wordids']) segment_ids.append(item['segmentids']) word_ids = np.array(word_ids) segment_ids = np.array(segment_ids) labels = tf.keras.utils.to_categorical(labels, num_classes=3) return [word_ids, segment_ids], labelstrain_data = MyDataGenerator(params.tain_file, batch_size=params.finetune_batch_size*gpus, min_buffer_size=100000, max_buffer_size=300000)val_data = MyDataGenerator(params.val_file, batch_size=params.finetune_batch_size*gpus, min_buffer_size=0, max_buffer_size=100000, shuffle=False)model.fit(iter(train_data), validation_data=iter(val_data), steps_per_epoch=len(train_data), validation_steps=len(val_data))

具體實(shí)現(xiàn)了一個(gè)DataGenerator父類(lèi),其必須包含數(shù)據(jù)條目(可返回batch個(gè)數(shù)),因?yàn)閙odel.fit中需要指定其迭代次數(shù)。為了保證generaotr持續(xù)有輸出,在讀取文件到末尾的時(shí)候,自動(dòng)返回文件頭。另外由于是在buffer中shuffle,其不能保證文件中的各行只輸出一次(但是能保證一個(gè)epoch最多max_buffer_size個(gè)重復(fù)的),需要依據(jù)數(shù)據(jù)條目酌情設(shè)置,這里應(yīng)該優(yōu)化,在達(dá)到文件末尾后等全量buffer清空后在seed到文件頭。另外,實(shí)現(xiàn)了子類(lèi),具體實(shí)現(xiàn)lines到numpy的操作。其實(shí)也可以把數(shù)據(jù)預(yù)處理和token2id放在這里,但是每個(gè)epoch都要處理一次,有點(diǎn)浪費(fèi)時(shí)間,因此習(xí)慣把所有預(yù)處理和toekn2id放到train前的預(yù)處理腳本中。

三. 模型訓(xùn)練速度慢,需要多卡訓(xùn)練

在tf2之后并行更簡(jiǎn)單了,代碼如下:

import tensorflow as tffrom keras_bert import load_trained_model_from_checkpointdef create_model(bert_train=False): bert = load_trained_model_from_checkpoint( config_path, checkpoint_path, training=False, trainable=bert_train, seq_len=SEQ_LEN,) inputs = bert.inputs[:2] dense = bert.get_layer('Encoder-12-FeedForward-Norm').output dense = tf.keras.layers.Lambda(lambda x: x[:, 1:, :])(dense) dense1 = tf.keras.layers.GlobalMaxPool1D()(dense) dense2 = tf.keras.layers.GlobalAveragePooling1D()(dense) dense = tf.keras.layers.Concatenate()([dense1, dense2]) dense = tf.keras.layers.Dense(params.dnn_units, activation='relu')(dense) dense = tf.keras.layers.Dropout(rate=params.dropout)(dense) output = tf.keras.layers.Dense( units=3, activation='softmax', name='3cls')(dense) model = tf.keras.models.Model(inputs, output) return modelos.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3,4,5,6,7"gpus = tf.config.experimental.list_physical_devices(device_type='GPU')for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True)gpus = len(os.environ["CUDA_VISIBLE_DEVICES"].split(','))strategy = tf.distribute.MirroredStrategy()with strategy.scope(): model = create_model(bert_train=False) scheduler = tf.keras.callbacks.ReduceLROnPlateau( monitor='val_loss', factor=0.5, patience=int(params.fit_opt_patience), min_delta=1e-7) loss = LossGenerate(params.model_loss) metrics = ['accuracy'] optimizer = tf.keras.optimizers.Adam(params.fit_lr) csvlogger = tf.keras.callbacks.CSVLogger(os.path.join( params.model_dir, 'log.tsv'), append=True, separator='\t') earlystop = tf.keras.callbacks.EarlyStopping( monitor='val_loss', patience=params.fit_patience) checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(params.model_dir, 'stage1.weight.h5'), save_weights_only=True, save_best_only=True) model.compile(loss=loss, metrics=metrics, optimizer=optimizer)

只需要在strategy.scope()下定義模型就行,很簡(jiǎn)單。但是我也遇到一個(gè)問(wèn)題: 在預(yù)測(cè)時(shí),在strategy.scope()加載存儲(chǔ)的模型文件報(bào)錯(cuò):

from keras_bert import get_custom_objectscustom_objects = get_custom_objects()with strategy.scope(): model = tf.keras.models.load_model( model_path, custom_objects=custom_objects)# 報(bào)錯(cuò)

具體錯(cuò)誤google很久也沒(méi)有結(jié)果,最終發(fā)現(xiàn)在strategy.scope下載入權(quán)重文件是可以的(可能是哪里實(shí)現(xiàn)兼容性不強(qiáng)吧),代碼:

with strategy.scope(): model = create_model(bert_train=False) model.load_weights(os.path.join(params.model_dir, 'stage1.weight.h5'))

實(shí)驗(yàn)結(jié)果

最終,在6卡v100并行下, 1000萬(wàn)長(zhǎng)度384的分類(lèi)模型訓(xùn)練好啦。stage1為固定bert訓(xùn)練結(jié)果, 01-0.4238為所有參數(shù)train的結(jié)果。發(fā)現(xiàn)了:1000W數(shù)據(jù),max-len設(shè)置為384, RoBERTa-wwm-ext 模型訓(xùn)練需要接近25小時(shí)。其實(shí)還是蠻快的.... 另外:?大力出奇跡的模型效果還可以!!!

為了湊夠1萬(wàn)字,放一下上文用到的LossGenerate函數(shù)

def LossGenerate(name='ce', *args, **kwargs): NAMES = ('ce', 'focal', 'dmi') kwargs = locals()['kwargs'] assert (name in NAMES), ' loss not defined!!!' if name == 'ce': return tf.keras.losses.CategoricalCrossentropy() if name == 'focal': gamma = kwargs.get('gamma', 2.) alpha = kwargs.get('alpha', 0.25) def categorical_focal_loss_fixed(y_true, y_pred): y_pred /= K.sum(y_pred, axis=-1, keepdims=True) epsilon = K.epsilon() y_pred = K.clip(y_pred, epsilon, 1. - epsilon) cross_entropy = -y_true * K.log(y_pred) loss = alpha * K.pow(1 - y_pred, gamma) * cross_entropy return K.mean(loss, axis=1) return categorical_focal_loss_fixed if name == 'dmi': def dmi_loss(y_true, y_pred): y_true = tf.transpose(y_true, perm=[1, 0]) mat = tf.matmul(y_true, y_pred) loss = -1.0 * tf.math.log(tf.math.abs(tf.linalg.det(mat)) + 0.001) return loss return dmi_loss 創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來(lái)咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)

總結(jié)

以上是生活随笔為你收集整理的for循环数据量太大_中文文本分类roberta大力出奇迹之数据量大的问题的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。