三百行代碼完成一個(gè)簡(jiǎn)單的rpc框架-創(chuàng)新互聯(lián)

花了半天的時(shí)間寫了個(gè)簡(jiǎn)單的rpc框架,是因?yàn)槲易畛蹩磀ubbo源碼的時(shí)候發(fā)現(xiàn)dubbo雖然看起來(lái)很龐大,但是隱隱約約總感覺(jué),其實(shí)其絕大多數(shù)功能,都是基于可擴(kuò)張性和服務(wù)治理的需要而編寫的。我看過(guò)dubbo和grpc的源碼,這兩個(gè)都是非常優(yōu)秀的rpc框架,但是為了讓初學(xué)rpc的同學(xué)能夠快速抓住rpc的核心,所以寫了這篇文章,希望看過(guò)的同學(xué),再次去看dubbo的源碼的時(shí)候,能夠抓住這個(gè)核心去看。
一:rpc協(xié)議的接口
RpcProtocol.java

創(chuàng)新互聯(lián)是一家集網(wǎng)站建設(shè),金秀企業(yè)網(wǎng)站建設(shè),金秀品牌網(wǎng)站建設(shè),網(wǎng)站定制,金秀網(wǎng)站建設(shè)報(bào)價(jià),網(wǎng)絡(luò)營(yíng)銷,網(wǎng)絡(luò)優(yōu)化,金秀網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競(jìng)爭(zhēng)力??沙浞譂M足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時(shí)我們時(shí)刻保持專業(yè)、時(shí)尚、前沿,時(shí)刻以成就客戶成長(zhǎng)自我,堅(jiān)持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實(shí)用型網(wǎng)站。
public interface RpcProtocol  {
 void export(int port);
 Object refer(Class inrerfaceClass,String host, int port);

}

這個(gè)接口類只提供兩個(gè)接口,那是因?yàn)閷?duì)于一個(gè)rpc框架來(lái)說(shuō),本質(zhì)上就只需要兩個(gè)接口,一個(gè)是consumer引用provider的服務(wù),一個(gè)是provider接收到consumer的請(qǐng)求之后對(duì)外暴露服務(wù)。
下面是具體的實(shí)現(xiàn)。代碼不復(fù)雜,可以直接復(fù)制到idea,慢慢調(diào)試

二:rpc協(xié)議的具體實(shí)現(xiàn)

RpcCore.java

public class RpcCore implements RpcProtocol{

private Socket socket;
private ObjectOutputStream objectOutputStream;
private ObjectInputStream objectInputStream;

private ServerSocket serverSocket;
private Map<String,List<Object>> services=new ConcurrentHashMap<String, List<Object>>();
private Map<String,Map<String,Object>> interfaceAtrributes=new ConcurrentHashMap<>();

@Override
public void export(int port){
    start(port);
}

@Override
public Object refer(final Class interfaceClass,String host, int port){
    connect(host,port);
    return Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass},
            new InvocationHandler() {
                @Override
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                    String interfaceName=interfaceClass.getName();
                    String fullName= (String) interfaceAtrributes.get(interfaceName).get("fullName");
                    return get(fullName,method,args);
                }
            });
}

public Object get(String interfaceFullName,Method method,Object[] parames){
    Object result=null;
    try {
        objectOutputStream.writeUTF(interfaceFullName);
        objectOutputStream.writeUTF(method.getName());
        objectOutputStream.writeObject(method.getParameterTypes());
        objectOutputStream.writeObject(parames);
        objectOutputStream.flush();
        result=objectInputStream.readObject();
    } catch (IOException e) {
        e.printStackTrace();
    } catch (ClassNotFoundException e) {
        e.printStackTrace();
    } finally {
        try {
            if (objectOutputStream!=null) {
                objectOutputStream.close();
            }
            if (objectInputStream!=null) {
                objectInputStream.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    return result;

}

private void start(int port) {
    try {
        serverSocket = new ServerSocket();
        serverSocket.bind(new InetSocketAddress("localhost", port));
        init();
    } catch (IOException e) {
        e.printStackTrace();
    }
    while (true) {
        System.out.println("server has started success port is --->"+port);
        Socket socket = null;
        try {
            socket = serverSocket.accept();
            new Thread(new Processsor(socket,services)).start();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

}

public  void init(){
    RpcDemo rpcDemo=new RpcDemoImplProvider();
    String group="rpcDemo";
    String version="1.0.0";
    String fullName=RpcDemo.class.getName()+"&"+group+"&"+version;
    List<Object> rpcDemoInstances=services.get(fullName);
    if (rpcDemoInstances==null){
        rpcDemoInstances=new ArrayList();
        rpcDemoInstances.add(rpcDemo);
    }
    services.put(fullName,rpcDemoInstances);
}

public void connect(String host, int port) {
    try {
        storeInterface();
        socket = new Socket();
        socket.connect(new InetSocketAddress(host, port));
        objectOutputStream=new ObjectOutputStream(socket.getOutputStream());
        objectInputStream=new ObjectInputStream(socket.getInputStream());
    } catch (IOException e) {
        e.printStackTrace();
    }

}

private void storeInterface(){
    String group="rpcDemo";
    String version="1.0.0";
    String fullName=RpcDemo.class.getName()+"&"+group+"&"+version;
    Map<String,Object> attributes=interfaceAtrributes.get(fullName);
    if (attributes==null){
        attributes=new ConcurrentHashMap(100);
        attributes.put("group",group);
        attributes.put("version",version);
        attributes.put("fullName",fullName);
    }
    interfaceAtrributes.put(RpcDemo.class.getName(),attributes);

}

class Processsor implements Runnable {
    private Socket socket;
    private ObjectInputStream objectInputStream;
    private ObjectOutputStream objectOutputStream;
    private Map<String,List<Object>> services;
    private Processsor(Socket socket,Map<String,List<Object>> services) {
        this.socket = socket;
        this.services=services;
    }

    @Override
    public void run() {
        System.out.println((((InetSocketAddress) socket.getRemoteSocketAddress()).getPort()));

        try {
            objectInputStream = new ObjectInputStream(socket.getInputStream());
            objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            String interfaceFullName=objectInputStream.readUTF();
            String methodName=objectInputStream.readUTF();
            Class[] parameTypes= (Class[]) objectInputStream.readObject();
            Object[] objects= (Object[]) objectInputStream.readObject();
            String interfaceName=interfaceFullName.split("&")[0];
            Class service=Class.forName(interfaceName);
            Method method=service.getMethod(methodName,parameTypes);
            Object instances=services.get(interfaceFullName).get(0);
            Object result = method.invoke(instances, objects);
            objectOutputStream.writeObject(result);
            objectOutputStream.flush();
            objectOutputStream.close();
            objectInputStream.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

}

三:rpc測(cè)試的接口
所謂接口說(shuō)白了就是協(xié)議,與http,mqtt等其他的協(xié)議本質(zhì)上沒(méi)什么區(qū)別,只不過(guò)rpc是強(qiáng)依賴,而后兩個(gè)是弱依賴而已,另外之所以把實(shí)體作為內(nèi)部類,是為了表達(dá)一種思想,rpc的實(shí)體類和異常都是協(xié)議的一部分,應(yīng)該將他們放到一起。
Rpcdemo.java

public interface RpcDemo {
Student getStudent(Integer id,String name);
class Student implements Serializable{
    public long id;
    public String name;
    public int age;
    public boolean man;

    public Student(long id, String name, int age, boolean man) {
        this.id = id;
        this.name = name;
        this.age = age;
        this.man = man;
    }

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public boolean isMan() {
        return man;
    }

    public void setMan(boolean man) {
        this.man = man;
    }

    @Override
    public String toString() {
        return "Student{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", age=" + age +
                ", man=" + man +
                '}';
    }
}

}

四:接口的實(shí)現(xiàn):

RpcDemoImplProvider.java

public class RpcDemoImplProvider implements RpcDemo{
public Student getStudent(Integer id,String name){
    return new Student(1234,"zhangsan",20,true);
}

public static void main(String[] args) {
    RpcCore rpcCore=new RpcCore();
    rpcCore.export(8087);
}

}

五:RpcDemoConsumer.java

消費(fèi)端。

public class RpcDemoConsumer {
public static void main(String[] args) {
    RpcCore rpcCore=new RpcCore();
    RpcDemo rpcDemo = (RpcDemo)rpcCore.refer(RpcDemo.class, "127.0.0.1", 8087);

    System.out.println(" 遠(yuǎn)程調(diào)用成功");
    System.out.println("返回的結(jié)果是---->"+rpcDemo.getStudent(111,"zhangsan"));
}

}

六:先啟動(dòng)RpcDemoImplProvider
三百行代碼完成一個(gè)簡(jiǎn)單的rpc框架

七:?jiǎn)?dòng)RpcDemoConsumer
三百行代碼完成一個(gè)簡(jiǎn)單的rpc框架

總共也就250行代碼左右。over

創(chuàng)新互聯(lián)www.cdcxhl.cn,專業(yè)提供香港、美國(guó)云服務(wù)器,動(dòng)態(tài)BGP最優(yōu)骨干路由自動(dòng)選擇,持續(xù)穩(wěn)定高效的網(wǎng)絡(luò)助力業(yè)務(wù)部署。公司持有工信部辦法的idc、isp許可證, 機(jī)房獨(dú)有T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確進(jìn)行流量調(diào)度,確保服務(wù)器高可用性。佳節(jié)活動(dòng)現(xiàn)已開(kāi)啟,新人活動(dòng)云服務(wù)器買多久送多久。

名稱欄目:三百行代碼完成一個(gè)簡(jiǎn)單的rpc框架-創(chuàng)新互聯(lián)
文章出自:http://muchs.cn/article10/dcghgo.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供云服務(wù)器、網(wǎng)站內(nèi)鏈軟件開(kāi)發(fā)、關(guān)鍵詞優(yōu)化網(wǎng)站改版、品牌網(wǎng)站建設(shè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

h5響應(yīng)式網(wǎng)站建設(shè)