当前位置: 首页 > news >正文

zookeeper应用之分布式队列

队列这种数据结构都不陌生,特点就是先进先出。有很多常用的消息中间件可以有现成的该部分功能,这里使用zookeeper基于发布订阅模式来实现分布式队列。对应的会有一个生产者和一个消费者。

这里理论上还是使用顺序节点。生产者不断产生新的顺序子节点,消费者watcher监听节点新增事件来消费消息。

生产者:

CuratorFramework client = ...
client.start();
String path = "/testqueue";
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path,"11".getBytes())

消费者:

CuratorFramework client = ...
client.start();
String path = "/testqueue";
PathChildrenCache pathCache = new PathChildrenCache(client,path,true);
pathCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {if(event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED){ChildData data = event.getData();//handle msgclient.delete().forPath(data.getPath());}}
});
pathCache.start();

使用curator queue:

先来使用基本的队列类DistributedQueue。

DistributedQueue的初始化需要提交准备几个参数:

client连接就不多说了:

CuratorFramework client = ...

QueueSerializer:这个主要是用来指定对消息data进行序列化和反序列化

这里就搞一个简单的字符串类型:

QueueSerializer<String> serializer = new QueueSerializer<String>() {@Overridepublic byte[] serialize(String item) {return item.getBytes();}@Overridepublic String deserialize(byte[] bytes) {return new String(bytes);}
};

QueueConsumer消息consumer,当有新消息来的时候会调用consumer.consumeMessage()来处理消息

这里也搞个简单的string类型的处理consumer

QueueConsumer<String> consumer = new QueueConsumer<String>() {@Overridepublic void consumeMessage(String s) throws Exception {System.out.println("receive msg:"+s);}@Overridepublic void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {//TODO}
};

队列消息发布:

//队列节点路径
String queuePath = "/queue";
//使用上面准备的几个参数构造DistributedQueue对象
DistributedQueue<String> queue =  QueueBuilder.builder(client,consumer,serializer,queuePath).buildQueue();
queue.start();
//调用put方法生产消息
queue.put("hello");
queue.put("msg");
Thread.sleep(2000);
queue.put("3");

这样在启动测试程序在,consumer的consumeMessage方法就会收到queue.put的消息。

这里有个问题有没有发现,在初始化queue的时候需要指定consumer,那岂不是只能同一个程序中生产消费,何来的分布式?

其实这里在queue对象创建的时候consumer可以为null,这个时候queue就只生产消息。具体的逻辑需要看下DistributedQueue类的源码。

在DistributedQueue类的构造函数有一步设置isProducerOnly属性

isProducerOnly = (consumer == null);

然后在start()方法会根据isProducerOnly来判断启动方式

if ( !isProducerOnly || (maxItems != QueueBuilder.NOT_SET) )
{childrenCache.start();
}if ( !isProducerOnly )
{service.submit(new Callable<Object>(){@Overridepublic Object call(){runLoop();return null;}});
}

这里看到consumer为空,两个if不成立,不会初始化对那个的消息消费逻辑wather监听。只需要在另一个程序里创建queue启动时指定consumer即可。

源码分析

先从消息的发布也就是put方法

首先调用makeItemPath()获取创建节点路径:

ZKPaths.makePath(queuePath, QUEUE_ITEM_NAME);

这里QUEUE_ITEM_NAME=“queue-”。

然后调用internalPut()方法来创建节点路径

//先累加消息数量putCount
putCount.incrementAndGet();
//使用serializer序列化消息数据
byte[]              bytes = ItemSerializer.serialize(multiItem, serializer);
//根据background来创建节点
if ( putInBackground )
{doPutInBackground(item, path, givenMultiItem, bytes);
}
else
{doPutInForeground(item, path, givenMultiItem, bytes);
}

看doPutInForeground里就是具体的创建节点了

//创建节点
client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path, bytes);
//哦,错了这里putCount不是总消息数,是正在创建消息数,创建完再回减
synchronized(putCount)
{putCount.decrementAndGet();putCount.notifyAll();
}//如果有对应的lisener依次调用
putListenerContainer.forEach(listener -> {if ( item != null ){listener.putCompleted(item);}else{listener.putMultiCompleted(givenMultiItem);}
});

消息的发布就完成了。

然后是消息的consumer,这里肯定是使用的watcher。这里还是回到前面start方法处根据isProducerOnly属性判断有两步操作:

1、childrenCache.start();

childrenCache初始化是在queue的构造函数里

childrenCache = new ChildrenCache(client, queuePath)

其start方法会调用

private final CuratorWatcher watcher = new CuratorWatcher()
{@Overridepublic void process(WatchedEvent event) throws Exception{if ( !isClosed.get() ){sync(true);}}
};private final BackgroundCallback  callback = new BackgroundCallback(){@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception{if ( event.getResultCode() == KeeperException.Code.OK.intValue() ){setNewChildren(event.getChildren());}}};void start() throws Exception{sync(true);}private synchronized void sync(boolean watched) throws Exception{if ( watched ){//走这里client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);}else{client.getChildren().inBackground(callback).forPath(path);}}

这里先把代码都贴上,看到内部定义了一个watcher和callback。这里inBackground就是watcher到事件使用callback进行处理,最后是调用到setNewChildren方法

private synchronized void setNewChildren(List<String> newChildren)
{if ( newChildren != null ){Data currentData = children.get();//将数据设置到children变量里,消息版本+1children.set(new Data(newChildren, currentData.version + 1));//notifyAll() 等待线程获取消息notifyFromCallback();}
}

这里有引入了一个children变量,然后将数据设置到了该变量里。

private final AtomicReference<Data> children = new AtomicReference<Data>(new Data(Lists.<String>newArrayList(), 0));

children其实是线程间通信一个共享数据容器变量。这里设置了数据,然后具体的数据消费在下一步。

2、线程池里丢了个任务去执行runLoop();方法。

回到DistributedQueue.start的第二步,执行runLoop()方法,看名字就应该知道了一直轮询获取消息。

还是来看代码吧

private void runLoop()
{long         currentVersion = -1;long         maxWaitMs = -1;//while一直轮询while ( state.get() == State.STARTED  ){try{//从childrenCache里获取数据ChildrenCache.Data      data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion);currentVersion = data.version;List<String>        children = Lists.newArrayList(data.children);sortChildren(children); // makes sure items are processed in the correct orderif ( children.size() > 0 ){maxWaitMs = getDelay(children.get(0));if ( maxWaitMs > 0 ){continue;}}else{continue;}/**处理数据 这里取出消息后会删除节点,然后使用serializer反序列化节点数据,调用consumer.consumeMessage来处理消息**/processChildren(children, currentVersion);}}}
}

这里获取数据使用了childrenCache.blockingNextGetData

synchronized Data blockingNextGetData(long startVersion, long maxWait, TimeUnit unit) throws InterruptedException
{long            startMs = System.currentTimeMillis();boolean         hasMaxWait = (unit != null);long            maxWaitMs = hasMaxWait ? unit.toMillis(maxWait) : -1;//数据版本没变一直wait等待while ( startVersion == children.get().version ){if ( hasMaxWait ){long        elapsedMs = System.currentTimeMillis() - startMs;long        thisWaitMs = maxWaitMs - elapsedMs;if ( thisWaitMs <= 0 ){break;}wait(thisWaitMs);}else{wait();}}return children.get();
}

这里就有wait阻塞等消息,当消息来时候会被唤醒。

其它类型队列:

curator对优先队列(DistributedPriorityQueue)、延迟队列(DistributedDelayQueue)都有对应的实现,有兴趣的自己看吧。

http://www.lryc.cn/news/239166.html

相关文章:

  • 取数游戏2(动态规划java)
  • Spring Boot中配置文件生效位置
  • AIGC创作系统ChatGPT网站系统源码,支持最新GPT-4-Turbo模型
  • 【JavaEE】操作系统与进程
  • 【MATLAB源码-第86期】基于matlab的QC-LDPC码性能仿真,输出误码率曲线。
  • 【0236】聊一聊PG内核中的命令标签(Command Tags、CommandTag、tag_behavior)
  • Python武器库开发-flask篇之error404(二十七)
  • 录屏软件自动开启录视频,是如何实现的?
  • 模拟shell小程序
  • webpack配置全局scss
  • 想面试前端工程师,必须掌握哪些知识和技能?【云驻共创】
  • 京东数据分析(京东数据采集):2023年10月京东平板电视行业品牌销售排行榜
  • 在 Linux 中,可以使用分号 (;) 或者 运算符来执行多条命令
  • 一些必备的 Redis 命令 | Navicat
  • 神经网络常用激活函数详解
  • UVA11584划分成回文串 Partitioning by Palindromes
  • 第十一章 将对象映射到 XML - 控制流属性的映射形式
  • torchvision中的标准ResNet50网络结构
  • Java 多线程之 synchronized (互拆锁/排他锁/非观锁)
  • 开源vs闭源大模型如何塑造技术的未来?开源模型的优劣势未来发展方向
  • 如何使用无代码系统搭建软件平台?有哪些开源无代码开发平台?
  • 微信怎么设置自动回复?
  • 基于Vue3的低代码开发平台——JNPF
  • Thinkphp6 模型 指定字段自增的方法
  • WhatsApp开发客户攻略来袭!还有你不知道的账号解封秘籍!
  • Linux C 基于tcp多线程在线聊天室
  • 代码随想录算法训练营第23期day60|84.柱状图中最大的矩形
  • vue动态获取目录结构进行配置静态路由
  • 产品工程师工作的职责十篇(合集)
  • 图片降噪软件 Topaz DeNoise AI mac中文版功能