|
| 1 | +#!/usr/bin/env python |
| 2 | +# -*- coding:utf-8 -*- |
| 3 | +# project : xadmin-server |
| 4 | +# filename : channel |
| 5 | +# author : ly_13 |
| 6 | +# date : 3/29/2025 |
| 7 | +import time |
| 8 | + |
| 9 | +from channels_redis.core import RedisChannelLayer as _RedisChannelLayer |
| 10 | + |
| 11 | + |
| 12 | +class RedisChannelLayer(_RedisChannelLayer): |
| 13 | + layer_expire = 30 # 需要心跳方式发送在线状态,否则将channel移除 |
| 14 | + |
| 15 | + async def group_discard(self, group, channel): |
| 16 | + """ |
| 17 | + Removes the channel from the named group if it is in the group; |
| 18 | + does nothing otherwise (does not error) |
| 19 | + """ |
| 20 | + assert self.valid_channel_name(channel), "Channel name not valid" |
| 21 | + connection, key = await self.auto_expire_layers(group) |
| 22 | + await connection.zrem(key, channel) |
| 23 | + |
| 24 | + async def auto_expire_layers(self, group): |
| 25 | + assert self.valid_group_name(group), "Group name not valid" |
| 26 | + key = self._group_key(group) |
| 27 | + connection = self.connection(self.consistent_hash(group)) |
| 28 | + |
| 29 | + # Discard old channels based on group_expiry |
| 30 | + await connection.zremrangebyscore( |
| 31 | + key, min=0, max=int(time.time()) - self.layer_expire |
| 32 | + ) |
| 33 | + |
| 34 | + return connection, key |
| 35 | + |
| 36 | + async def get_layers(self, group): |
| 37 | + connection, key = await self.auto_expire_layers(group) |
| 38 | + return [x.decode("utf8") for x in await connection.zrange(key, 0, -1)] |
| 39 | + |
| 40 | + async def update_active_layers(self, group, channel): |
| 41 | + connection, key = await self.auto_expire_layers(group) |
| 42 | + await connection.zadd(key, {channel: time.time()}) |
| 43 | + await connection.expire(key, self.group_expiry) |
| 44 | + |
| 45 | + async def get_groups(self): |
| 46 | + groups = [] |
| 47 | + group = self._group_key("*") |
| 48 | + for index in range(self.ring_size): |
| 49 | + connection = self.connection(index) |
| 50 | + cursor = 0 |
| 51 | + while True: |
| 52 | + cursor, keys = await connection.scan(cursor, match=group) |
| 53 | + for key in keys: |
| 54 | + groups.append(key.decode("utf8").split(":")[-1]) |
| 55 | + if cursor == 0: |
| 56 | + break |
| 57 | + return groups |
0 commit comments