Java Elastic Job動態(tài)添加任務實現過程解析
背景
在使用Elastic-Job的過程中,有很多人遇到了這么一個問題,就是如何動態(tài)的去添加任務?
在官方的文檔中也有對此作出回答,如下:
動態(tài)添加作業(yè)這個概念每個人理解不盡相同。
elastic-job-lite為jar包,由開發(fā)或運維人員負責啟動。啟動時自動向注冊中心注冊作業(yè)信息并進行分布式協(xié)調,因此并不需要手工在注冊中心填寫作業(yè)信息。 但注冊中心與作業(yè)部署機無從屬關系,注冊中心并不能控制將單點的作業(yè)分發(fā)至其他作業(yè)機,也無法將遠程服務器未啟動的作業(yè)啟動。elastic-job-lite并不會包含ssh免密管理等功能。
elastic-job-cloud為mesos框架,由mesos負責作業(yè)啟動和分發(fā)。 但需要將作業(yè)打包上傳,并調用elastic-job-cloud提供的REST API寫入注冊中心。 打包上傳屬于部署系統(tǒng)的范疇elastic-job-cloud并未涉及。
綜上所述,elastic-job已做了基本動態(tài)添加功能,但無法做到真正意義的完全自動化添加。
接下來談談我對動態(tài)任務的理解,我眼中的動態(tài)任務分為2種:
一種是全新的任務,包括實現的邏輯也是全新的,也就是當我們的程序打成一個jar包后,線上已經在運行了,這個時候我加了一個新的任務,如何能做到不停服務,將這個任務集成到已有的任務中去,這個實現起來難度比較大,涉及到Java類的熱加載等,不過最近阿里又有一開源大作JarsLink,GitHub地址:https://github.com/alibaba/jarslink,可以支持在運行時動態(tài)加載到系統(tǒng)中,實現不需要重啟和發(fā)布系統(tǒng)新增功能。還有一種實現思路我們可以利用Groovy腳本來做這樣的事情,一般情況下重啟來發(fā)布新的任務會比較常見,如果各位一定要實現動態(tài)的任務可以自己嘗試著去研究下我提供的思路。
另一種就是執(zhí)行的業(yè)務邏輯不變,只是運行的時間發(fā)生變化。比如文章的定時發(fā)布,可以設置文章在某天的某分鐘進行自動發(fā)布,實現這個功能有多種方式,你可以不停的掃描任務,一到時間點就自動發(fā)布,比較優(yōu)雅的方式就是為每篇文章的自動發(fā)布都設置一個任務,通過Cron表達式來指定執(zhí)行時間,不同的是每個任務都有自己的參數,業(yè)務邏輯都是固定的定時發(fā)布。接下來我給大家介紹下Elastic-Job實現上面講的第二種動態(tài)任務的方式,也就是任務的實現邏輯已經是存在的,只是需要發(fā)布成多個不同時間去觸發(fā)的任務。
實戰(zhàn)
實現任務的動態(tài)添加比較簡單,只需要接收任務的信息,然后初始化一下就可以了,在實現的過程中筆者遇到了一個麻煩的問題?
在多節(jié)點分片任務卻只有一個節(jié)點能執(zhí)行,問題原因在于當有任務A和任務B,2個節(jié)點的時候,我們調用A節(jié)點的接口進行任務的動態(tài)添加,在A節(jié)點中初始化了任務調度器,數據也存儲到了注冊中心,但是B節(jié)點是不知道有新的任務添加,默認的使用方法是每個節(jié)點在啟動時去初始化任務調度器,而我們的B節(jié)點已經啟動過了,任務是新添加的。
解決這個問題最簡單的方式就是將任務的節(jié)點都集中管理起來,無論動態(tài)任務在哪個節(jié)點上進行注冊,都需要將這個請求轉發(fā)到其他的節(jié)點上進行初始化操作,這樣就可以保證多節(jié)點分片的任務正常執(zhí)行。
還有一種對使用者更友好的辦法是對Zookeeper中的節(jié)點進行監(jiān)聽,當有新的節(jié)點創(chuàng)建時,就自動獲取這個節(jié)點的配置信息,在本地進行任務初始化,通過這樣的方式就可以不用去轉發(fā)請求到其他節(jié)點了,只要在任何節(jié)點有添加操作,都能被監(jiān)聽到,并自己去初始化。
監(jiān)控代碼如下:
/** * 開啟任務監(jiān)聽,當有任務添加時,監(jiān)聽zk中的數據增加,自動在其他節(jié)點也初始化該任務 */public void monitorJobRegister() { CuratorFramework client = zookeeperRegistryCenter.getClient(); @SuppressWarnings('resource') PathChildrenCache childrenCache = new PathChildrenCache(client, '/', true); PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { ChildData data = event.getData(); switch (event.getType()) { case CHILD_ADDED: String config = new String(client.getData().forPath(data.getPath() + '/config')); Job job = JsonUtils.toBean(Job.class, config); addJob(job); break; default: break; } } }; childrenCache.getListenable().addListener(childrenCacheListener); try { childrenCache.start(StartMode.POST_INITIALIZED_EVENT); } catch (Exception e) { e.printStackTrace(); } }
為了方便大家使用,我將動態(tài)添加任務的功能集成到了我之前的elastic-job-spring-boot-starter(https://github.com/yinjihuan/elastic-job-spring-boot-starter)中集成了動態(tài)添加的邏輯,大家引入依賴即可使用。
使用方式比較簡單,只需要在啟動類上加一個ComponentScan注解,讓Spring能夠掃描到elastic-job-spring-boot-starter提供的代碼即可:
@SpringBootApplication@EnableElasticJob//開啟動態(tài)任務添加API@ComponentScan(basePackages = {'com.cxytiandi'})public class JobApplication { public static void main(String[] args) { new SpringApplicationBuilder().sources(JobApplication.class).web(true).run(args); try { new CountDownLatch(1).await(); } catch (InterruptedException e) { } }}
配置好之后,啟動項目就可以通過REST API來動態(tài)的注冊任務,API列表如下:
/job添加任務是POST請求,數據格式為JSON體提交,格式如下:{'jobName':'DynamicJob13','cron':'0 33 16 ?','jobType':'SIMPLE','jobClass':'com.cxytiandi.job.demo.DynamicJob','jobParameter':'2222222','shardingTotalCount':1}
完整字段請參考:
https://github.com/yinjihuan/elastic-job-spring-boot-starter/blob/master/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/dynamic/bean/Job.java
注意:jobClass必須事先存在于服務中* /job/remove
刪除任務是GET請求,參數只要任務名稱即可,比如:/job/remove?jobName=任務名。可以用于任務完成之后清空注冊中心的任務信息。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持好吧啦網。
相關文章: