vert.x 分布式锁_使用Vert.x进行响应式开发
vert.x 分布式鎖
最近,似乎我們正在聽到有關Java的最新和最好的框架的消息。 Ninja , SparkJava和Play等工具; 但是每個人都固執己見,使您感到需要重新設計整個應用程序以利用它們的出色功能。 這就是為什么當我發現Vert.x時令我感到寬慰的原因。 Vert.x不是一個框架,它是一個工具包,它不受質疑,而且正在解放。 Vert.x不想讓您重新設計整個應用程序以使用它,它只是想讓您的生活更輕松。 您可以在Vert.x中編寫整個應用程序嗎? 當然! 您可以將Vert.x功能添加到現有的Spring / Guice / CDI應用程序中嗎? 是的 您可以在現有JavaEE應用程序中使用Vert.x嗎? 絕對! 這就是讓它變得驚人的原因。
背景
Vert.x誕生于Tim Fox決定他喜歡NodeJS生態系統中正在開發的許多東西,但他不喜歡在V8中進行權衡取舍:單線程,有限的庫支持以及JavaScript本身。 Tim著手編寫一個對如何使用它以及如何使用它沒有質疑的工具箱,他決定在JVM上實現它的最佳位置。 因此,Tim和社區開始著手創建一個事件驅動的,非阻塞的,React性的工具包,該工具包在許多方面都可以反映NodeJS可以完成的工作,而且還利用了JVM內部的強大功能。 Node.x誕生了,后來發展成為Vert.x。
總覽
Vert.x旨在實現事件總線,該事件總線使應用程序的不同部分可以以非阻塞/線程安全的方式進行通信。 它的一部分是根據Eralng和Akka展示的Actor方法建模的。 它還旨在充分利用當今的多核處理器和高度并發的編程需求。 因此,默認情況下,所有Vert.x VERTICLES默認都實現為單線程。 與NodeJS不同,Vert.x可以在許多線程中運行許多頂點。 另外,您可以指定某些頂點為“工作”頂點,并且可以是多線程的。 為了給蛋糕錦上添花,Vert.x通過使用Hazelcast對事件總線的多節點群集提供了底層支持。 它繼續包含許多其他令人驚奇的功能,這些功能太多了,無法在此處列出,但是您可以在Vert.x官方文檔中內容。
關于Vert.x,您需要了解的第一件事是,與NodeJS一樣,永遠不要阻塞當前線程。 默認情況下,Vert.x中的所有內容都設置為使用回調/未來/承諾。 Vert.x不執行同步操作,而是提供異步方法來執行大多數I / O和處理器密集型操作,這些操作可能會阻塞當前線程。 現在,使用回調可能很丑陋且很痛苦,因此Vert.x可以選擇提供基于RxJava的API,該API使用Observer模式實現相同的功能。 最后,Vert.x通過在許多異步API上提供executeBlocking(Function f)方法,可以輕松使用現有的類和方法。 這意味著您可以選擇喜歡使用Vert.x的方式,而不是由工具包指示必須如何使用它。
了解Vert.x的第二件事是它由頂點,模塊和節點組成。 頂點是Vert.x中最小的邏輯單元,通常由單個類表示。 遵循UNIX Philosophy的原則,頂點應該是簡單且具有單一用途的。 一組頂點可以放到一個模塊中,該模塊通常打包為單個JAR文件。 一個模塊代表一組相關的功能,這些功能一起使用時,可以代表整個應用程序,也可以代表較大的分布式應用程序的一部分。 最后,節點是運行一個或多個模塊/垂直模塊的JVM的單個實例。 由于Vert.x具有從頭開始內置的群集功能,因此Vert.x應用程序可以跨越一臺計算機或跨多個地理位置的多臺計算機跨越節點(盡管延遲可能會掩蓋性能)。
示例項目
現在,我最近去過許多聚會和會議,在他們談論React式編程時,它們向您展示的第一件事就是構建一個聊天室應用程序。 很好,但這并不能真正幫助您完全理解響應式開發的力量。 聊天室應用程序既簡單又簡單。 我們可以做得更好。 在本教程中,我們將使用舊版Spring應用程序并將其轉換為利用Vert.x的優勢。 這具有多個目的:表明該工具包易于與現有的Java項目集成,它使我們能夠利用可能是生態系統中根深蒂固的現有工具的優勢,并且使我們遵循DRY原則 ,因為我們不不必重寫大量代碼即可獲得Vert.x的好處。
我們的舊版Spring應用程序是使用Spring Boot,Spring Data JPA和Spring REST的REST API的簡單示例。 源代碼可以在“主”分支中找到該處 。 我們還將使用其他分支來演示進展,因此,只要對git和Java 8有一點經驗的人都可以輕松進行。 讓我們從檢查常規Spring應用程序的Spring Configuration類開始。
@SpringBootApplication @EnableJpaRepositories @EnableTransactionManagement @Slf4j public class Application {public static void main(String[] args) {ApplicationContext ctx = SpringApplication.run(Application.class, args);System.out.println("Let's inspect the beans provided by Spring Boot:");String[] beanNames = ctx.getBeanDefinitionNames();Arrays.sort(beanNames);for (String beanName : beanNames) {System.out.println(beanName);}}@Beanpublic DataSource dataSource() {EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder();return builder.setType(EmbeddedDatabaseType.HSQL).build();}@Beanpublic EntityManagerFactory entityManagerFactory() {HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();vendorAdapter.setGenerateDdl(true);LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean();factory.setJpaVendorAdapter(vendorAdapter);factory.setPackagesToScan("com.zanclus.data.entities");factory.setDataSource(dataSource());factory.afterPropertiesSet();return factory.getObject();}@Beanpublic PlatformTransactionManager transactionManager(final EntityManagerFactory emf) {final JpaTransactionManager txManager = new JpaTransactionManager();txManager.setEntityManagerFactory(emf);return txManager;} }正如您在課程頂部看到的那樣,我們有一些非常標準的Spring Boot注釋。 你還會看到@ SLF4J批注這是一部分Lombok庫,旨在幫助降低鍋爐板代碼。 我們還有@Bean注釋方法,用于提供對JPA EntityManager,TransactionManager和DataSource的訪問。 這些項目中的每一個都提供可注入的對象,供其他類使用。 項目中的其余類也類似地簡化。 有一個客戶 POJO,它是服務中使用的實體類型。 通過Spring Data創建了一個CustomerDAO 。 最后,有一個CustomerEndpoints類,它是JAX-RS注釋的REST控制器。
如前所述,這是Spring Boot應用程序中的所有標準票價。 該應用程序的問題在于,在大多數情況下,它的可伸縮性有限。 您可以在Servlet容器中運行此應用程序,也可以在Jetty或Undertow之類的嵌入式服務器中運行該應用程序。 無論哪種方式,每個請求都占用一個線程,因此在等待I / O操作時浪費了資源。
切換到Convert-To-Vert.x-Web分支,我們可以看到Application類發生了一些變化。 現在,我們有了一些新的@Bean批注方法來注入Vertx實例本身,以及ObjectMapper實例(Jackson JSON庫的一部分)。 我們還用新的CustomerVerticle替換了CustomerEnpoints類。 幾乎所有其他內容都是相同的。
CustomerVerticle類帶有@Component注釋,這意味著Spring將在啟動時實例化該類。 它還具有用@PostConstruct注釋的start方法,以便在啟動時啟動Verticle。 查看代碼的實際內容,我們看到Vert.x代碼的第一部分: Router 。
Router類是vertx-web庫的一部分,它使我們能夠使用流暢的API來定義HTTP URL,方法和標頭過濾器以進行請求處理。 將BodyHandler實例添加到默認路由可以處理POST / PUT正文并將其轉換為JSON對象,然后Vert.x可以將其作為RoutingContext的一部分進行處理。 Vert.x中的路由順序可能很重要。 如果定義的路由具有某種形式的全局匹配(*或regex),則除非實現chaining ,否則它可能會吞噬在其后定義的路由的請求。 我們的示例最初顯示了3條路線。
@PostConstructpublic void start() throws Exception {Router router = Router.router(vertx);router.route().handler(BodyHandler.create());router.get("/v1/customer/:id").produces("application/json").blockingHandler(this::getCustomerById);router.put("/v1/customer").consumes("application/json").produces("application/json").blockingHandler(this::addCustomer);router.get("/v1/customer").produces("application/json").blockingHandler(this::getAllCustomers);vertx.createHttpServer().requestHandler(router::accept).listen(8080);}請注意,定義了HTTP方法,定義了“ Accept”標頭(通過消耗),定義了“ Content-Type”標頭(通過生產)。 我們還看到我們正在通過對blockingHandler方法的調用傳遞對請求的處理。 Vert.x路由的阻塞處理程序接受RoutingContext對象,因為它是唯一的參數。 RoutingContext保存Vert.x請求對象,響應對象和任何參數/ POST主體數據(例如“:id”)。 您還將看到,我使用方法引用而不是lambda來將邏輯插入blockingHandler(我發現它更具可讀性)。 這3條請求路由的每個處理程序都在該類中一個單獨的方法中定義。 這些方法基本上只是調用DAO上的方法,根據需要進行序列化或反序列化,設置一些響應頭,并通過發送響應來結束請求。 總體而言,非常簡單明了。
private void addCustomer(RoutingContext rc) {try {String body = rc.getBodyAsString();Customer customer = mapper.readValue(body, Customer.class);Customer saved = dao.save(customer);if (saved!=null) {rc.response().setStatusMessage("Accepted").setStatusCode(202).end(mapper.writeValueAsString(saved));} else {rc.response().setStatusMessage("Bad Request").setStatusCode(400).end("Bad Request");}} catch (IOException e) {rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");log.error("Server error", e);}}private void getCustomerById(RoutingContext rc) {log.info("Request for single customer");Long id = Long.parseLong(rc.request().getParam("id"));try {Customer customer = dao.findOne(id);if (customer==null) {rc.response().setStatusMessage("Not Found").setStatusCode(404).end("Not Found");} else {rc.response().setStatusMessage("OK").setStatusCode(200).end(mapper.writeValueAsString(dao.findOne(id)));}} catch (JsonProcessingException jpe) {rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");log.error("Server error", jpe);}}private void getAllCustomers(RoutingContext rc) {log.info("Request for all customers");List customers = StreamSupport.stream(dao.findAll().spliterator(), false).collect(Collectors.toList());try {rc.response().setStatusMessage("OK").setStatusCode(200).end(mapper.writeValueAsString(customers));} catch (JsonProcessingException jpe) {rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");log.error("Server error", jpe);}}您可能會說:“但是,這比我的Spring注釋和類還要更多的代碼和混亂”。 這可能是正確的,但實際上取決于您如何實現代碼。 這只是一個介紹性的示例,因此我使代碼非常簡單易懂。 我可以使用Vert.x的注釋庫以類似于JAX-RS的方式實現端點。 此外,我們還獲得了可擴展性的巨大改進。 在幕后,Vert.x Web使用Netty進行低級異步I / O操作,從而使我們能夠處理更多并發請求(受數據庫連接池的大小限制)。
通過使用Vert.x Web庫,我們已經對該應用程序的可伸縮性和并發性進行了一些改進,但是通過實現Vert.x EventBus ,我們可以做一些改進。 通過將數據庫操作分為Worker Verticles,而不是使用blockingHandler,我們可以更有效地處理請求處理。 這在“ 轉換為工作人員垂直”分支中顯示。 應用程序類保持不變,但是我們更改了CustomerEndpoints類,并添加了一個名為CustomerWorker的新類。 此外,我們添加了一個名為Spring Vert.x Extension的新庫,該庫為Vert.x Verticles提供了Spring Dependency Injections支持。 首先查看新的CustomerEndpoints類。
@PostConstructpublic void start() throws Exception {log.info("Successfully create CustomerVerticle");DeploymentOptions deployOpts = new DeploymentOptions().setWorker(true).setMultiThreaded(true).setInstances(4);vertx.deployVerticle("java-spring:com.zanclus.verticles.CustomerWorker", deployOpts, res -> {if (res.succeeded()) {Router router = Router.router(vertx);router.route().handler(BodyHandler.create());final DeliveryOptions opts = new DeliveryOptions().setSendTimeout(2000);router.get("/v1/customer/:id").produces("application/json").handler(rc -> {opts.addHeader("method", "getCustomer").addHeader("id", rc.request().getParam("id"));vertx.eventBus().send("com.zanclus.customer", null, opts, reply -> handleReply(reply, rc));});router.put("/v1/customer").consumes("application/json").produces("application/json").handler(rc -> {opts.addHeader("method", "addCustomer");vertx.eventBus().send("com.zanclus.customer", rc.getBodyAsJson(), opts, reply -> handleReply(reply, rc));});router.get("/v1/customer").produces("application/json").handler(rc -> {opts.addHeader("method", "getAllCustomers");vertx.eventBus().send("com.zanclus.customer", null, opts, reply -> handleReply(reply, rc));});vertx.createHttpServer().requestHandler(router::accept).listen(8080);} else {log.error("Failed to deploy worker verticles.", res.cause());}});}路由相同,但實現代碼不同。 現在,我們不再使用對blockingHandler的調用,而是實現了適當的異步處理程序,該處理程序在事件總線上發送事件。 此Verticle中不再進行任何數據庫處理。 我們已將數據庫處理移至一個工作線程,該線程具有多個實例,以線程安全的方式并行處理多個請求。 我們還為這些事件的回復時間注冊了一個回調,以便我們可以向發出請求的客戶端發送適當的響應。 現在,在CustomerWorker Verticle中,我們已經實現了數據庫邏輯和錯誤處理。
@Override public void start() throws Exception {vertx.eventBus().consumer("com.zanclus.customer").handler(this::handleDatabaseRequest); }public void handleDatabaseRequest(Message<Object> msg) {String method = msg.headers().get("method");DeliveryOptions opts = new DeliveryOptions();try {String retVal;switch (method) {case "getAllCustomers":retVal = mapper.writeValueAsString(dao.findAll());msg.reply(retVal, opts);break;case "getCustomer":Long id = Long.parseLong(msg.headers().get("id"));retVal = mapper.writeValueAsString(dao.findOne(id));msg.reply(retVal);break;case "addCustomer":retVal = mapper.writeValueAsString(dao.save(mapper.readValue(((JsonObject)msg.body()).encode(), Customer.class)));msg.reply(retVal);break;default:log.error("Invalid method '" + method + "'");opts.addHeader("error", "Invalid method '" + method + "'");msg.fail(1, "Invalid method");}} catch (IOException | NullPointerException e) {log.error("Problem parsing JSON data.", e);msg.fail(2, e.getLocalizedMessage());} }CustomerWorker工人垂直服務器在事件總線上注冊消費者以獲取消息。 代表事件總線上地址的字符串是任意的,但是建議使用反向tld樣式的命名結構,以確保地址唯一(“ com.zanclus.customer”)很簡單。 每當有新消息發送到該地址時,它將被傳遞到一個,只有一個工作層。 然后,工作層將調用handleDatabaseRequest來完成數據庫工作,JSON序列化和錯誤處理。
你有它。 您已經看到Vert.x可以集成到舊版應用程序中,以提高并發性和效率,而不必重寫整個應用程序。 我們可以使用現有的Google Guice或JavaEE CDI應用程序完成類似的操作。 當我們在Vert.x中嘗試添加響應功能時,所有業務邏輯都可能保持相對不變。 下一步由您決定。 接下來的一些想法包括Clustering , WebSockets和ReactiveX sugar的VertxRx 。
翻譯自: https://www.javacodegeeks.com/2015/12/reactive-development-using-vert-x.html
vert.x 分布式鎖
總結
以上是生活随笔為你收集整理的vert.x 分布式锁_使用Vert.x进行响应式开发的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: javadocs_不会吸引人的JavaD
- 下一篇: 突然讨厌做前端,讨厌代码_不要讨厌HAT