python 排序统计滤波器_马尔可夫链+贝叶斯滤波器的Python展示
知乎上已經(jīng)有很多的學(xué)習(xí)筆記,但讀完后總有一種這東西不是我的我理解不了的感覺,所以想試著寫一篇文章來加深一下自己的理解,也記錄下學(xué)習(xí)中的盲點(diǎn)。
非常推薦大家去Github看一個(gè)項(xiàng)目:
https://github.com/rlabbe/filterpy?github.com#下面的代碼也是完全基于上述作者的庫函數(shù)完成的,所以需要先去Github下載庫函數(shù)安裝,或者直接使用 pip install filterpy #但是作者忘記把離散的白噪聲函數(shù)放進(jìn)庫里,那段函數(shù)可以在書里找到,或者改天我可以放在網(wǎng)盤里分享出來 #不影響下面代碼展示這里除了有卡爾曼濾波外還有各種貝葉斯濾波的python實(shí)現(xiàn),作者也寫了一本書開源在Github詳細(xì)介紹了卡爾曼,本文就算是讀書筆記吧。下面是作者發(fā)布的教材:
Jupyter Notebook Viewer?nbviewer.jupyter.org太高估自己的效率,聽了128次大田后生仔+n首其他終于趕在沒有眼瞎前停工了。
下面代碼基本是從原作者書里粘來的,我只改了某些錯(cuò)誤和可視化以及一些小的細(xì)節(jié)(動(dòng)圖做的還不夠好)作者在書中試圖用追蹤狗在走廊中移動(dòng)的模型來解釋離散貝葉斯濾波,由先驗(yàn)概率到后驗(yàn)概率再到引入反饋器這一思路詳細(xì)的介紹了貝葉斯濾波器并且實(shí)現(xiàn)了python的編程。同時(shí)作者一直在向我們強(qiáng)調(diào)學(xué)會(huì)數(shù)學(xué)編程的重要性,本人理解的不夠深刻,大家可以去翻閱原作者的書去看。
為什么我一直想實(shí)現(xiàn)貝葉斯濾波或者卡爾曼濾波,因?yàn)樗且环N動(dòng)態(tài)濾波器,它可以用來建立動(dòng)態(tài)的反演模型,這和我以前接觸的反演很不一樣,我曾經(jīng)做過利用光學(xué)遙感建立泥沙反演模型和利用兩層BP神經(jīng)網(wǎng)絡(luò)對地震波建立反演模型找地下礦層的兩個(gè)小東西,以上兩個(gè)反演模型都是靜態(tài)的過程,而卡爾曼濾波最反智的則是在濾波中不斷更新模型參數(shù)(加入了后饋過程也就是K卡爾曼增益矩陣),這讓我困惑了特別久。
如果大家也對這種濾波器感興趣建議先學(xué)些統(tǒng)計(jì)知識(shí)和最小二乘法,卡爾曼濾波和最小二乘法之間有著很緊密地聯(lián)系,并且最小二乘法方程可以很好的幫助你理解最小均方誤差,這是實(shí)現(xiàn)卡爾曼濾波的重要假設(shè)。
下面也會(huì)放一段簡單馬爾可夫鏈的python小程序,如果你和我一樣用Sublime ,多試幾次Ctrl + B,這是基于隨機(jī)數(shù)做的,但是你會(huì)發(fā)現(xiàn)在多次迭代后概率會(huì)收斂到一個(gè)定值,還記得統(tǒng)計(jì)課上講的大數(shù)定律嘛,在樣本空間足夠大時(shí),依概率收斂到某個(gè)數(shù),大約就是這個(gè)意思。所以在隨機(jī)天氣生成器中需要用蒙托卡羅思想來添加擾動(dòng)來破壞這種收斂性?,我上一篇文章中也寫道過,但是我想錯(cuò)了,并不是對轉(zhuǎn)移概率矩陣添加擾動(dòng),而是對初始狀態(tài)添加擾動(dòng)(最近聽了幾個(gè)MCMC方法的讀書匯報(bào),發(fā)現(xiàn)了這個(gè)秘密)
有關(guān)于今天統(tǒng)計(jì)課上講到的氣候序列均一化我也想說點(diǎn)兒給某位不來上課的同學(xué),我認(rèn)為卡爾曼濾波器可以很好的應(yīng)用到均一化過程中,具體方法我不說,啦啦啦。
#離散貝葉斯濾波器的實(shí)現(xiàn)展示 #The Kalman filter belongs to a family of filters called Bayesian filters. #NMEFC YuHao import numpy as np import matplotlib.pyplot as plt from filterpy.discrete_bayes import normalize from filterpy.discrete_bayes import predict from filterpy.discrete_bayes import updatex = range(10) hallway = np.array([1,1,0,0,0,0,0,0,1,0]) #Tracking a Dogdef update_belief(hall,belief,z,correct_scale):for i ,val in enumerate(hall):if val == z:belief[i] *= correct_scale belief = np.array([0.1]*10) reading = 1 update_belief(hallway,belief,z = reading,correct_scale = 3.) print("belief:",belief) print("sum = ",sum(belief)) plt.figure() plt.bar(x,belief) plt.title("belief") plt.show()#歸一化 homogeneity_array = belief/sum(belief) print(homogeneity_array) print(hallway == 1 ) #計(jì)算后驗(yàn)概率 def scale_update(hall ,belief, z, z_prob):scale = z_prob / (1. - z_prob)belief[hall == z] *= scalenormalize(belief) belief = np.array([0.1]*10) scale_update(hallway ,belief ,z=1,z_prob = .75) print('sum =',sum(belief)) print("probability of door =",belief[0]) print("probability of wall =", belief[2]) plt.figure() plt.bar(x,belief) plt.title("belief") plt.show()def scale_update(hall,belief,z,z_prob):scale = z_prob / (1. - z_prob)likelihood = np.ones(len(hall))likelihood[hall == z] *= scalereturn normalize(likelihood * belief) def lh_hallway(hall, z, z_prob):#compute likelihood that a measurement matches#positions in the hallway.try:scale = z_prob / (1. - z_prob)except ZeroDivisionError:scale = 1e8likelihood = np.ones(len(hall))likelihood[hall==z] *= scalereturn likelihoodbelief = np.array([0.1] * 10) likelihood = lh_hallway(hallway, z=1, z_prob=.75) print(normalize(likelihood * belief)) def perfect_predict(belief,move):n = len(belief)result = np.zeros(n)for i in range(n):result[i] = belief[(i-move) % n]return result belief = np.array([.35,.1,.2,.3,0,0,0,0,0,.05])belief = perfect_predict(belief,1) plt.bar(x,belief) plt.title("belief") plt.show() #添加噪聲def predict_move(belief, move, p_under, p_correct, p_over):n = len(belief)prior = np.zeros(n)for i in range(n):prior[i] = (belief[(i-move) % n] * p_correct +belief[(i-move-1) % n] * p_over +belief[(i-move+1) % n] * p_under) return priorbelief = [0., 0., 0., 1., 0., 0., 0., 0., 0., 0.] prior = predict_move(belief, 2, .1, .8, .1) print(prior) fig = plt.figure() ax1 = fig.add_subplot(1,2,1) ax2 = fig.add_subplot(1,2,2) ax1.bar(x,belief) ax2.bar(x,prior) ax1.set_title("belief") ax2.set_title("prior") ax1.set_xticks(x) ax2.set_xticks(x) plt.show() belief = [0, 0, .4, .6, 0, 0, 0, 0, 0, 0] prior = predict_move(belief, 2, .1, .8, .1) fig = plt.figure() ax1 = fig.add_subplot(1,2,1) ax2 = fig.add_subplot(1,2,2) ax1.bar(x,belief) ax2.bar(x,prior) ax1.set_title("belief") ax2.set_title("prior") ax1.set_xticks(x) ax2.set_xticks(x) plt.show()belief = np.array([1.0,0,0,0,0,0,0,0,0,0]) beliefs = [] for i in range(100):belief = predict_move(belief,1,.1,.8,.1)beliefs.append(belief) print("Final Belief:",belief) print(len(beliefs)) print(beliefs[1]) for i in range(len(beliefs)):plt.cla()plt.bar(x,beliefs[i])plt.xticks(x)plt.pause(0.05)plt.show() """ #卷積泛化 #卷積算法的python描述,但是時(shí)間復(fù)雜度太高,pass def predict_move_convolution(pdf ,offset,kernel):N = len(pdf)kN = len(kernel)width = int((kN - 1)/2)prior = np.zeros(N)for i in range(N):for k in range(kN):index = (i + (width - k)-offset)%Nprior[i] += pdf[index] * kernel[k]return prior """belief = [.05, .05, .05, .05, .55, .05, .05, .05, .05, .05] prior = predict(belief,offset = 1,kernel = [.1,.8,.1]) fig = plt.figure() ax1 = fig.add_subplot(1,2,1) ax2 = fig.add_subplot(1,2,2) ax1.bar(x,belief) ax2.bar(x,prior) ax1.set_title("belief") ax2.set_title("prior") ax1.set_xticks(x) ax2.set_xticks(x) plt.show() prior = predict(belief, offset=3, kernel=[.05, .05, .6, .2, .1]) #make sure that it shifts the positions correctly for movements greater than one and for asymmetric kernels #making a prediction of where the dog is moving, and convolving the probabilities to get the priorfig = plt.figure() ax1 = fig.add_subplot(1,2,1) ax2 = fig.add_subplot(1,2,2) ax1.bar(x,belief) ax2.bar(x,prior) ax1.set_title("belief") ax2.set_title("prior") ax1.set_xticks(x) ax2.set_xticks(x) plt.show() """ #Integrating Measurements and Movement Updates """ """ 請仔細(xì)閱讀下一段話 Let's think about this intuitively.Consider a simple case - you are tracking a dog while he sits still.During each prediction you predict he doesn't move.Your filter quickly converges on an accurate estimate of his position.Then the microwave in the kitchen turns on, and he goes streaking off.You don't know this, so at the next prediction you predict he is in the same spot.But the measurements tell a different story.As you incorporate the measurements your belief will be smeared along the hallway, leading towards the kitchen.On every epoch (cycle) your belief that he is sitting still will get smaller,and your belief that he is inbound towards the kitchen at a startling rate of speed increases. 最精彩的部分來了:反饋器""" tank1 = [] tank2 = []hallway = np.array([1, 1, 0, 0, 0, 0, 0, 0, 1, 0]) prior = np.array([.1] * 10) likelihood = lh_hallway(hallway, z=1, z_prob=.75) posterior = normalize(likelihood * prior) tank2.append(posterior) tank1.append(prior) fig = plt.figure() ax1 = fig.add_subplot(1,2,1) ax2 = fig.add_subplot(1,2,2) ax1.bar(x,prior) ax2.bar(x,posterior) ax1.set_title("prior") ax2.set_title("posterior") ax1.set_xticks(x) ax1.set_ylim(0,0.5) ax2.set_ylim(0,0.5) ax2.set_xticks(x) plt.show()#After the first update we have assigned a high probability to each door position, and a low probability to each wall position. prior = np.array([.1] * 10) kernel = (.1, .8, .1) prior = predict(posterior, 1, kernel) tank1.append(posterior) tank2.append(prior) fig = plt.figure() ax1 = fig.add_subplot(1,2,1) ax2 = fig.add_subplot(1,2,2) ax1.bar(x,prior) ax2.bar(x,posterior) ax1.set_title("posterior") ax2.set_title("prior") ax1.set_xticks(x) ax1.set_ylim(0,0.5) ax2.set_ylim(0,0.5) ax2.set_xticks(x) plt.show() #The predict step shifted these probabilities to the right, smearing them about a bit. Now let's look at what happens at the next sense.likelihood = lh_hallway(hallway, z=1, z_prob=.75) posterior = update(likelihood, prior) tank1.append(prior) tank2.append(posterior) fig = plt.figure() ax1 = fig.add_subplot(1,2,1) ax2 = fig.add_subplot(1,2,2) ax1.bar(x,prior) ax2.bar(x,posterior) ax1.set_title("prior") ax2.set_title("posterior") ax1.set_xticks(x) ax1.set_ylim(0,0.5) ax2.set_ylim(0,0.5) ax2.set_xticks(x) plt.show() #Notice the tall bar at position 1. This corresponds with the (correct) case of starting at position 0, sensing a door, shifting 1 to the right, and sensing another door. No other positions make this set of observations as likely. Now we will add an update and then sense the wall. prior = predict(posterior, 1, kernel) likelihood = lh_hallway(hallway, z=0, z_prob=.75) posterior = update(likelihood, prior) tank1.append(prior) tank2.append(posterior) fig = plt.figure() ax1 = fig.add_subplot(1,2,1) ax2 = fig.add_subplot(1,2,2) ax1.bar(x,prior) ax2.bar(x,posterior) ax1.set_title("prior") ax2.set_title("posterior") ax1.set_xticks(x) ax1.set_ylim(0,0.5) ax2.set_ylim(0,0.5) ax2.set_xticks(x) plt.show() #This is exciting! We have a very prominent bar at position 2 with a value of around 35%. It is over twice the value of any other bar in the plot, and is about 4% larger than our last plot, where the tallest bar was around 31%. Let's see one more cycle. prior = predict(posterior, 1, kernel) likelihood = lh_hallway(hallway, z=0, z_prob=.75) posterior = update(likelihood, prior) prior = predict(posterior, 1, kernel) likelihood = lh_hallway(hallway, z=0, z_prob=.75) posterior = update(likelihood, prior) tank1.append(prior) tank2.append(posterior) fig = plt.figure() ax1 = fig.add_subplot(1,2,1) ax2 = fig.add_subplot(1,2,2) ax1.bar(x,prior) ax2.bar(x,posterior) ax1.set_title("prior") ax2.set_title("posterior") ax1.set_xticks(x) ax1.set_ylim(0,0.5) ax2.set_ylim(0,0.5) ax2.set_xticks(x) plt.show() print(tank1) print(tank2) for i in range(len(tank1)):plt.cla()plt.subplot(1,2,1)plt.bar(x,tank1[i])plt.xticks(x)plt.ylim(0,0.5)plt.subplot(1,2,2)plt.bar(x,tank2[i])plt.xticks(x)plt.ylim(0,0.5)plt.pause(0.5)plt.show() #離散貝葉斯算法備注添加的可能不是很準(zhǔn)確,有問題大家可以留言。
import numpy as np import random as rmstate = ["sleep","Icecream","Run"] transitionName = [["SS","SR","SI"],["RS","RR","RI"],["IS","IR","II"]] transitionMatrix = [[0.2,0.6,0.2],[0.1,0.6,0.3],[0.2,0.7,0.1]] if sum(transitionMatrix[0]) + sum(transitionMatrix[1]) + sum(transitionMatrix[2]) !=3:print("Wrong") else:print("All is gonna be okay,you should move on!!;)")def activity_forecast(days):activityToday = "Sleep"#print("Start_state:" + activityToday)activityList = [activityToday]i = 0prob = 1while i != days:if activityToday == "Sleep":change = np.random.choice(transitionName[0],replace = True,p =transitionMatrix[0])if change == "SS":prob = prob * 0.2activityList.append("Sleep")passelif change == "SR":prob = prob * 0.6activityToday = "Sleep"activityList.append("Sleep")else:prob = prob *0.2activityToday = "Icecream"activityList.append("Icecream")elif activityToday == "Run":change = np.random.choice(transitionName[1],replace = True ,p = transitionMatrix[1])if change == "RR":prob = prob * 0.5activityList.append("Run")passelif change == "RS":prob = prob *0.2activityToday = "Sleep"activityList.append("Sleep")else:prob = prob * 0.3activityToday = "Icecream"activityList.append("Icecream")elif activityToday == "Icecream":change = np.random.choice(transitionName[2],replace = True,p = transitionMatrix[2])if change == "II":prob = prob *0.1activityList.append("Icecream")passelif change == "IS":prob = prob * 0.2activityToday = "Sleep"activityList.append("Sleep")else:prob = prob * 0.7activityToday = "Run"activityList.append("Run")i += 1return activityList list_activity = [] count = 0 for iterations in range(1,10000):list_activity.append(activity_forecast(2)) for smaller_list in list_activity:if(smaller_list[2] == "Sleep"):count += 1 percentage = (count/10000) * 100print("the probability of starting at state:'Sleep' and ending at state:'Sleep' = " + str(percentage) + "%")#馬爾可夫鏈展示能看到這兒說明你對我的文章很有興趣,那么有任何問題可以通過任何方式問我。
日常吐槽:
最近效率真是低的可怕,可我睡不著啊。。。
休息不好吃又吃不下要掉體重了,為了回家不讓老媽發(fā)現(xiàn)(不然要被瘋狂投食)牛奶得換全脂來保證熱量攝入了。
來吧,都看到這兒了陪我聽首歌啊:
キミがいれば(世紀(jì)末ヴァージョン)?music.163.com柯姓男孩永不認(rèn)輸
總結(jié)
以上是生活随笔為你收集整理的python 排序统计滤波器_马尔可夫链+贝叶斯滤波器的Python展示的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: deinstall 卸载grid_卸载O
- 下一篇: 结束python服务器进程_服务器端后台