原博客地址:http://blog.csdn.net/sinat_34596644/article/details/52599688
前言:隨著一個系統被用戶認可,業務量、請求量不斷上升,那么單機系統必然就無法滿足了,于是系統就慢慢走向分布式了,隨之而來的是系統之間“溝通”的障礙。一般來說,解決系統之間的通信可以有兩種方式:即遠程調用和消息。RMI(Remote Method Invocation)就是遠程調用的一種方式,也是這篇文章主要介紹的。
一、RMI的一個簡單示例
這個示例拆分為服務端和客戶端,放在兩個idea項目中,并且通過了單機和雙機兩種環境的測試,是真正意義上的分布式應用。
項目結構
服務端應用: Server
主程序: ? ? ? ?com.jnu.wwt.entry.Server
服務接口: ? ? com.jnu.wwt.service.IOperation
服務實現: ? ? com.jnu.wwt.service.impl.OperationImpl
客戶端應用: Client
主程序: ? ? ? ?com.jnu.wwt.entry.Client
服務接口: ? ?com.jnu.wwt.service.IOperation
源碼:
Server.java
/**
* Created by wwt on 2016/9/14.
*/
public class Server {
public static void main (String args[])
throws Exception{
//以1099作為LocateRegistry接收客戶端請求的端口,并注冊服務的映射關系
Registry registry=LocateRegistry.
createRegistry (
1099 )
;
IOperation iOperation=
new OperationImpl()
;
Naming.
rebind (
"rmi://127.0.0.1:1099/Operation" , iOperation)
;
System.
out .println(
"service running..." )
;
}}
IOperation.java(服務端和客戶端各需要一份)
/**
* 服務端接口必須實現java.rmi.Remote
* Created by wwt on 2016/9/14.
*/
public interface IOperation
extends Remote{
/**
* 遠程接口上的方法必須拋出RemoteException,因為網絡通信是不穩定的,不能吃掉異常
* @param a
* @param b
* @return
*/
int add (
int a
, int b)
throws RemoteException
;
}
OperationImpl.java
/**
* Created by wwt on 2016/9/14.
*/
public class OperationImpl
extends UnicastRemoteObject
implements IOperation{
public OperationImpl ()
throws RemoteException {
super ()
;
}
@Override
public int add (
int a
, int b)
throws RemoteException{
return a+b
;
}}
Client.java
/**
* Created by wwt on 2016/9/15.
*/
public class Client {
public static void main (String args[])
throws Exception{IOperation iOperation= (IOperation) Naming.
lookup (
"rmi://127.0.0.1:1099/Operation" )
;
System.
out .println(iOperation.add(
1 , 1 ))
;
}}
運行結果
先運行Server應用,服務就起來了。然后切換到Client應用,點擊運行,Client調用Server的服務,返回結果。
二、RMI做了些什么
現在我們先忘記Java中有RMI這種東西。假設我們需要自己實現上面例子中的效果,怎么辦呢?可以想到的步驟是:
編寫服務端服務,并將其通過某個服務機的端口暴露出去供客戶端調用。 編寫客戶端程序,客戶端通過指定服務所在的主機和端口號、將請求封裝并序列化,最終通過網絡協議發送到服務端。 服務端解析和反序列化請求,調用服務端上的服務,將結果序列化并返回給客戶端。 客戶端接收并反序列化服務端返回的結果,反饋給用戶。
這是大致的流程,我們不難想到,RMI其實也是幫我們封裝了一些細節而通用的部分,比如序列化和反序列化,連接的建立和釋放等,下面是RMI的具體流程:
這里涉及到幾個新概念:
Stub和Skeleton :這兩個的身份是一致的,都是作為代理的存在??蛻舳说姆Q作Stub,服務端的稱作Skeleton。要做到對程序員屏蔽遠程方法調用的細節,這兩個代理是必不可少的,包括網絡連接等細節。
Registry :顧名思義,可以認為Registry是一個“注冊所”,提供了服務名到服務的映射。如果沒有它,意味著客戶端需要記住每個服務所在的端口號,這種設計顯然是不優雅的。
三、走進RMI原理之前,先來看看用到的類及其層次結構和主要的方法。
哪里看不懂隨時回來看看結構。。。開始了
四、一步步解剖RMI的底層原理
Registry registry=LocateRegistry.
createRegistry (
1099 )
; 從上面這句代碼入手,追溯下去,可以發現服務端創建了一個RegistryImpl對象,這里做了一個判斷。如果服務端指定的端口號是1099并且系統開啟了安全管理器,那么可以在限定的權限集內(listen和accept)繞過系統的安全校驗。反之則必須進行安全校驗。這里純粹是為了效率起見。真正做的事情在setUp()方法中,繼續看下去。 public RegistryImpl(
final int var1)
throws RemoteException {
if (var1 ==
1099 && System.getSecurityManager() !=
null ) {
try {AccessController.doPrivileged(
new PrivilegedExceptionAction() {
public Void run()
throws RemoteException {LiveRef var1x =
new LiveRef(RegistryImpl.id
, var1)
;
RegistryImpl.
this .setup(
new UnicastServerRef(var1x))
;
return null;
}}
, (AccessControlContext)
null, new Permission[]{
new SocketPermission(
"localhost:" + var1
, "listen,accept" )})
;
}
catch (PrivilegedActionException var3) {
throw (RemoteException)var3.getException()
;
}}
else {LiveRef var2 =
new LiveRef(id
, var1)
;
this .setup(
new UnicastServerRef(var2))
;
}}
setUp()方法將指向正在初始化的RegistryImpl對象的遠程引用ref(RemoteRef)賦值為傳入的UnicastServerRef對象,這里涉及了向上轉型。然后繼續移交UnicastServerRef的exportObject()方法。 private void setup(UnicastServerRef var1)
throws RemoteException {
this .ref = var1
;
var1.exportObject(
this, (Object)
null, true )
;
}
進入UnicastServerRef的exportObject()方法??梢钥吹?#xff0c;這里首先為傳入的RegistryImpl創建一個代理,這個代理我們可以推斷出就是后面服務于客戶端的RegistryImpl的Stub對象。然后將UnicastServerRef的skel(skeleton)對象設置為當前RegistryImpl對象。最后用skeleton、stub、UnicastServerRef對象、id和一個boolean值構造了一個Target對象,也就是這個Target對象基本上包含了全部的信息。調用UnicastServerRef的ref(LiveRef)變量的exportObject()方法。 public Remote exportObject(Remote var1
, Object var2
, boolean var3)
throws RemoteException {Class var4 = var1.getClass()
;
Remote var5
;
try {var5 = Util.createProxy(var4
, this .getClientRef()
, this .forceStubUse)
;
}
catch (IllegalArgumentException var7) {
throw new ExportException(
"remote object implements illegal remote interface" , var7)
;
}
if (var5
instanceof RemoteStub) {
this .setSkeleton(var1)
;
}Target var6 =
new Target(var1
, this, var5
, this .ref.getObjID()
, var3)
;
this .ref.exportObject(var6)
;
this .hashToMethod_Map = (Map)hashToMethod_Maps.get(var4)
;
return var5
;
}
到上面為止,我們看到的都是一些變量的賦值和創建工作,還沒有到連接層,這些引用對象將會被Stub和Skeleton對象使用。接下來就是連接層上的了。追溯LiveRef的exportObject()方法,很容易找到了TCPTransport的exportObject()方法。這個方法做的事情就是將上面構造的Target對象暴露出去。調用TCPTransport的listen()方法,listen()方法創建了一個ServerSocket,并且啟動了一條線程等待客戶端的請求。接著調用父類Transport的exportObject()將Target對象存放進ObjectTable中。 public void exportObject(Target var1)
throws RemoteException {
synchronized (
this ) {
this .listen()
;
++
this .exportCount
;
}
boolean var2 =
false;
boolean var12 =
false;
try {var12 =
true;
super .exportObject(var1)
;
var2 =
true;
var12 =
false;
}
finally {
if (var12) {
if (!var2) {
synchronized (
this ) {
this .decrementExportCount()
;
}}}}
if (!var2) {
synchronized (
this ) {
this .decrementExportCount()
;
}}}
到這里,我們已經將RegistryImpl對象創建并且起了服務等待客戶端的請求。
IOperation iOperation= (
IOperation ) Naming.
lookup (
"rmi://127.0.0.1:1099/Operation" )
;
從上面的代碼看起,容易追溯到LocateRegistry的getRegistry()方法。這個方法做的事情是通過傳入的host和port構造RemoteRef對象,并創建了一個本地代理??梢酝ㄟ^Debug功能發現,這個代理對象其實是RegistryImpl_Stub對象。這樣客戶端便有了服務端的RegistryImpl的代理(取決于ignoreStubClasses變量)。但注意此時這個代理其實還沒有和服務端的RegistryImpl對象關聯,畢竟是兩個VM上面的對象,這里我們也可以猜測,代理和遠程的Registry對象之間是通過socket消息來完成的。
public static Registry
getRegistry (String host
, int port
,
RMIClientSocketFactory csf)
throws RemoteException
{Registry registry =
null;
if (port <=
0 )port = Registry.
REGISTRY_PORT ;
if (host ==
null || host.length() ==
0 ) {
// If host is blank (as returned by "file:" URL in 1.0.2 used in
// java.rmi.Naming), try to convert to real local host name so
// that the RegistryImpl's checkAccess will not fail.
try {host = java.net.InetAddress.
getLocalHost ().getHostAddress()
;
}
catch (Exception e) {
// If that failed, at least try "" (localhost) anyway...
host =
"" ;
}}
LiveRef liveRef =
new LiveRef(
new ObjID(ObjID.
REGISTRY_ID )
,
new TCPEndpoint(host
, port
, csf
, null )
,
false )
;
RemoteRef ref =(csf ==
null ) ?
new UnicastRef(liveRef) :
new UnicastRef2(liveRef)
;
return (Registry) Util.
createProxy (RegistryImpl.
class, ref
, false )
;
}
從OperationImpl的構造函數看起。調用了父類UnicastRemoteObject的構造方法,追溯到UnicastRemoteObject的私有方法exportObject()。這里做了一個判斷,判斷服務的實現是不是UnicastRemoteObject的子類,如果是,則直接賦值其ref(RemoteRef)對象為傳入的UnicastServerRef對象。反之則調用UnicastServerRef的exportObject()方法。這里我們是第一種情況。
private static Remote
exportObject (Remote obj
, UnicastServerRef sref)
throws RemoteException
{
// if obj extends UnicastRemoteObject, set its ref.
if (obj
instanceof UnicastRemoteObject) {((UnicastRemoteObject) obj).
ref = sref
;
}
return sref.exportObject(obj
, null, false )
;
}
將服務實現綁定到服務端的Registry上,使得客戶端只需與Registry交互。
Naming .
rebind (
"rmi://127.0.0.1:1099/Operation" , iOperation)
; 從上面這行代碼開始看,容易發現Naming的方法全部都是調用的Registry的方法。這里通過host和port找到我們第一步啟動的服務端Registry服務對象,追溯到其rebind()方法,可以看到,其實做的事情很是簡單,就是把名字和服務實現存進一個Map里面。
public void rebind(String var1
, Remote var2)
throws RemoteException
, AccessException {checkAccess(
"Registry.rebind" )
;
this .bindings.put(var1
, var2)
;
}
接下來就是重頭戲了,從下面代碼看起。
IOperation iOperation= (
IOperation ) Naming.
lookup (
"rmi://127.0.0.1:1099/Operation" )
;
追溯下去,獲取到遠程Registry對象的代理對象之后,調用RegistryImpl_Stub的lookUp()方法。主要代碼如下。做的事情是利用上面通過服務端host和port等信息創建的RegistryImpl_stub對象構造RemoteCall調用對象,operations參數中是各個Registry中聲明的操作,2指明了是lookUp()操作。接下來分步驟看看...
try {RemoteCall var2 =
super .ref.newCall(
this, operations
, 2 , 4905912898345647071L )
;
try {ObjectOutput var3 = var2.getOutputStream()
;
var3.writeObject(var1)
;
}
catch (IOException var18) {
throw new MarshalException(
"error marshalling arguments" , var18)
;
}
super .ref.invoke(var2)
;
Remote var23
;
try {ObjectInput var6 = var2.getInputStream()
;
var23 = (Remote)var6.readObject()
;
}
catch (IOException var15) {
throw new UnmarshalException(
"error unmarshalling return" , var15)
;
}
catch (ClassNotFoundException var16) {
throw new UnmarshalException(
"error unmarshalling return" , var16)
;
}
finally {
super .ref.done(var2)
;
}
return var23
;
}
調用 RegistryImpl_Stub的ref(RemoteRef)對象的newCall()方法,將RegistryImpl_Stub對象傳了進去,不要忘了構造它的時候我們將服務器的主機端口等信息傳了進去,也就是我們把服務器相關的信息也傳進了newCall()方法。newCall()方法做的事情簡單來看就是建立了跟遠程RegistryImpl的Skeleton對象的連接。(不要忘了上面我們說到過服務端通過TCPTransport的exportObject()方法等待著客戶端的請求)
public RemoteCall newCall(RemoteObject var1
, Operation[] var2
, int var3
, long var4)
throws RemoteException {clientRefLog.log(Log.BRIEF
, "get connection" )
;
Connection var6 =
this .ref.getChannel().newConnection()
;
try {clientRefLog.log(Log.VERBOSE
, "create call context" )
;
if (clientCallLog.isLoggable(Log.VERBOSE)) {
this .logClientCall(var1
, var2[var3])
;
}StreamRemoteCall var7 =
new StreamRemoteCall(var6
, this .ref.getObjID()
, var3
, var4)
;
try {
this .marshalCustomCallData(var7.getOutputStream())
;
}
catch (IOException var9) {
throw new MarshalException(
"error marshaling custom call data" )
;
}
return var7
;
}
catch (RemoteException var10) {
this .ref.getChannel().free(var6
, false )
;
throw var10
;
}
}
連接建立之后自然就是發送請求了。我們知道客戶端終究只是擁有Registry對象的代理,而不是真正地位于服務端的Registry對象本身,他們位于不同的虛擬機實例之中,無法直接調用。必然是通過消息進行交互的??纯磗uper.ref.invoke()這里做了什么?容易追溯到StreamRemoteCall的executeCall()方法??此票镜卣{用,但其實很容易從代碼中看出來是通過tcp連接發送消息到服務端。由服務端解析并且處理調用。
try {
if (
this .out !=
null ) {var2 =
this .out.getDGCAckHandler()
;
}
this .releaseOutputStream()
;
DataInputStream var3 =
new DataInputStream(
this .conn.getInputStream())
;
byte var4 = var3.readByte()
;
if (var4 !=
81 ) {
if (Transport.transportLog.isLoggable(Log.BRIEF)) {Transport.transportLog.log(Log.BRIEF
, "transport return code invalid: " + var4)
;
}
throw new UnmarshalException(
"Transport return code invalid" )
;
}
this .getInputStream()
;
var1 =
this .in.readByte()
;
this .in.readID()
;
}
至此,我們已經將客戶端的服務查詢請求發出了。
這里我用的方法是直接斷點在服務端的Thread的run()方法中,因為我們知道服務端已經用線程跑起了服務(當然我是先斷點在Registry_Impl的lookUp()方法并查找調用棧找到源頭的)。一步一步我們找到了Transport的serviceCall()方法,這個方法是關鍵。瞻仰一下主要的代碼,到ObjectTable.getTarget()為止做的事情是從socket流中獲取ObjId,并通過ObjId和Transport對象獲取Target對象,這里的Target對象已經是服務端的對象。再借由Target的派發器Dispatcher,傳入參數服務實現和請求對象RemoteCall,將請求派發給服務端那個真正提供服務的RegistryImpl的lookUp()方法,這就是Skeleton移交給具體實現的過程了,Skeleton負責底層的操作。
try {ObjID var40
;
try {var40 = ObjID.read(var1.getInputStream())
;
}
catch (IOException var34) {
throw new MarshalException (
"unable to read objID" , var34)
;
}Transport var41 = var40.equals(dgcID)?
null :
this;
Target var5 = ObjectTable.getTarget(
new ObjectEndpoint(var40
, var41))
;
final Remote var38
;
if (var5 !=
null && (var38 = var5.getImpl()) !=
null ) {
final Dispatcher var6 = var5.getDispatcher()
;
var5.incrementCallCount ()
;
boolean var8
;
try {transportLog.log(Log.VERBOSE
, "call dispatcher" )
;
final AccessControlContext var7 = var5.getAccessControlContext()
;
ClassLoader var42 = var5.getContextClassLoader()
;
Thread var9 = Thread.currentThread()
;
ClassLoader var10 = var9.getContextClassLoader()
;
try {var9.setContextClassLoader(var42)
;
currentTransport.set(
this )
;
try {AccessController.doPrivileged(
new PrivilegedExceptionAction() {
public Void run()
throws IOException {Transport.
this .checkAcceptPermission(var7)
;
var6.dispatch(var38
, var1)
;
return null;
}}
, var7)
;
return true;
}
catch (PrivilegedActionException var32) {
throw (IOException)var32.getException()
;
}}
finally {var9.setContextClassLoader(var10)
;
currentTransport.set((Object)
null )
;
}}
catch (IOException var35) {transportLog.log(Log.BRIEF
, "exception thrown by dispatcher: " , var35)
;
var8 =
false;
}
finally {var5.decrementCallCount()
;
}
return var8
;
}
throw new NoSuchObjectException (
"no such object in table" )
;
}
看看RegistryImpl的lookUp()實現。做了同步控制,并通過服務名從Map中取出服務對象。返回給客戶端。還記得我們在bindings中存放的其實是OperationImpl的真正實現,并非是Stub對象。
public Remote lookup(String var1)
throws RemoteException
, NotBoundException {Hashtable var2 =
this .bindings
;
synchronized (
this .bindings) {Remote var3 = (Remote)
this .bindings.get(var1)
;
if (var3 ==
null ) {
throw new NotBoundException(var1)
;
}
else {
return var3
;
}}
}
客戶端獲取通過lookUp()查詢獲得的客戶端OperationImpl的Stub對象
這里就不多說了。。多說無益。心好累。憑什么服務端返回給客戶端的是服務的實現,但是客戶端獲取到的是Stub對象呢?用同樣的斷點的方法,我們可以發現問題出在MarshalInputStream的resolveProxyClass()上,里面其實也是創建了一個代理。這就是那個Stub類。
到目前為止,客戶端已經有了Stub對象。就可以和服務端進行愉快地交流了。細心的朋友可能發現這個例子中的服務實現OperationImpl繼承了UnicastRemoteObject,就像前面說的,它似乎不會像RegistryImpl一樣在服務端生成Skeleton對象。(對于非UnicastRemoteObject的則會生成Skeleton沒啥爭議)。我的理解是必然會進行一些處理生成Skeleton對象。因為Registry只是用來查找服務,最終調用服務還是得要客戶端與服務的連接。這個連接必然由Skeleton為我們屏蔽了。
小結:前面我們做了很多工作,大量工作用于起Registry服務和如何查找客戶端需要調用的服務。但事實上,這個Registry可以服務于很多的其他服務。一旦客戶端和服務端通過Stub和Skeleton建立了socket連接,后面的操作直接通過這個連接完成就結了!
五、看看Skeleton和Stub如何為我們屏蔽底層連接細節
Stub類:
public ? class ?Person_Stub? implements ?Person?{???????? ????private ?Socket?socket;???????? ????public ?Person_Stub()? throws ?Throwable?{???????? ?????????? ????????socket?=?new ?Socket( "computer_name" ,? 9000 );???????? ????}???????? ????public ? int ?getAge()? throws ?Throwable?{???????? ?????????? ????????ObjectOutputStream?outStream?=???????? ????????????new ?ObjectOutputStream(socket.getOutputStream());???????? ????????outStream.writeObject("age" );???????? ????????outStream.flush();???????? ????????ObjectInputStream?inStream?=???????? ????????????new ?ObjectInputStream(socket.getInputStream());???????? ????????return ?inStream.readInt();???????? ????}???????? ????public ?String?getName()? throws ?Throwable?{???????? ?????????? ????????ObjectOutputStream?outStream?=???????? ????????????new ?ObjectOutputStream(socket.getOutputStream());???????? ????????outStream.writeObject("name" );???????? ????????outStream.flush();???????? ????????ObjectInputStream?inStream?=???????? ????????????new ?ObjectInputStream(socket.getInputStream());???????? ????????return ?(String)inStream.readObject();???????? ????}?? } ?? 可以看到,Stub對象做的事情是建立到服務端Skeleton對象的Socket連接。將客戶端的方法調用轉換為字符串標識傳遞給Skeleton對象。并且同步阻塞等待服務端返回結果。
Skeleton類:
public ? class ?Person_Skeleton? extends ?Thread?{???????? ????private ?PersonServer?myServer;???????? ????public ?Person_Skeleton(PersonServer?server)?{???????? ?????????? ????????this .myServer?=?server;???????? ????}???????? ????public ? void ?run()?{???????? ????????try ?{???????? ?????????????? ????????????ServerSocket?serverSocket?=?new ?ServerSocket( 9000 );???????? ?????????????? ????????????Socket?socket?=?serverSocket.accept();???????? ????????????while ?(socket?!=? null )?{???????? ?????????????????? ????????????????ObjectInputStream?inStream?=???????? ????????????????????new ?ObjectInputStream(socket.getInputStream());???????? ????????????????String?method?=?(String)inStream.readObject();???????? ?????????????????? ????????????????if ?(method.equals( "age" ))?{???????? ?????????????????????? ????????????????????int ?age?=?myServer.getAge();???????? ????????????????????ObjectOutputStream?outStream?=???????? ????????????????????????new ?ObjectOutputStream(socket.getOutputStream());???????? ?????????????????????? ????????????????????outStream.writeInt(age);???????? ????????????????????outStream.flush();???????? ????????????????}???????? ????????????????if (method.equals( "name" ))?{???????? ?????????????????????? ????????????????????String?name?=?myServer.getName();???????? ????????????????????ObjectOutputStream?outStream?=???????? ????????????????????????new ?ObjectOutputStream(socket.getOutputStream());???????? ?????????????????????? ????????????????????outStream.writeObject(name);???????? ????????????????????outStream.flush();???????? ????????????????}???????? ????????????}???????? ????????}?catch (Throwable?t)?{???????? ????????????t.printStackTrace();???????? ????????????System.exit(0 );???????? ????????}???????? ????}????????????? } ? Skeleton對象做的事情是將服務實現傳入構造參數,獲取客戶端通過socket傳過來的方法調用字符串標識,將請求轉發到具體的服務上面。獲取結果之后返回給客戶端。
六、總結:
本來是Java一個很簡單的用法,用了將近3天看了這部分的內容,感覺收獲還是比較大的。阿里實習的時候,一個師兄曾經說過,看源代碼需要看到什么程度?老實說這玩意看起來真的太累了。??偹闶强赐炅?。我覺得這是走向分布式的比較重要的一步。由于篇幅的關系,沒有涉及到TCP連接的細節。有興趣的可以看看源碼。
“看到你覺得你能說服自己就可以了”。
七、附注:
這里參考了這篇文章以及其引用的各篇文章。 《java RMI原理詳解 》
致敬和感謝!
總結
以上是生活随笔 為你收集整理的深究Java中的RMI底层原理 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。