从零开始开发一个实时博客
suiw9 2024-11-27 20:40 28 浏览 0 评论
后台回复【入门资料】
送你十本Python电子书
本文作者为何世友,科技媒体【爱范儿】的CTO。这是他在前年写的一篇介绍Django Channels 使用的教程,基本思路值得大家参考,但是要注意接口可能有变化,请以最新文档为准。
最终效果
ASGI 、Django Channels 简介
ASGI 的完整说明我在去年做了一个翻译。ASGI 由 Django 团队提出,为了解决在一个网络框架里(如 Django)同时处理 HTTP、HTTP2、WebSocket 协议。为此,Django 团队开发了 Django Channels 插件,为 Django 带来了 ASGI 能力。
在 ASGI 中,将一个网络请求划分成三个处理层面,最前面的一层,interface server(协议处理服务器),负责对请求协议进行解析,并将不同的协议分发到不同的 Channel(频道);频道属于第二层,通常可以是一个队列系统。频道绑定了第三层的 Consumer(消费者)。
比如说,HTTP 协议的频道绑定了 HTTP 的消费者,当有新的 HTTP 请求过来时,interface server 将该请求分发到 HTTP 频道,HTTP 频道绑定的 HTTP 消费者对该请求进行处理,将处理结果返回给 HTTP 频道,最终传回给客户端。
下面,我们用一个实例演示下这种能力。完整源码在 Github-heshiyou/livelog: A Django Channels example project to demonstrate the ASGI use case.。
实时博客的实现
第一步:跑起来
本例基于 Python 3.5.1、macOS 10.12.4 beta 5,理论上 Python 2.7、Linux 可以跑起来(测试工作请自行认领)。
安装依赖:pip install-r requirements.txt
requirements.txt 如下:
django
channels
asgi_redis
feedparser
新建 Django 项目:django-admin startproject livelog
。在 settings.py 里面,将channels
添加到 INSTALLED_APPS,再加上基础的 channels 相关配置:
...
INSTALLED_APPS = (
...
'channels',
)
...
# Redis
REDIS_OPTIONS = {
'HOST': '127.0.0.1',
'PORT': 6379,
'DB': 0
}
USE_REDIS = True
# Channel settings
CHANNEL_LAYERS = {
"default": {
"BACKEND": "asgi_redis.RedisChannelLayer",
"CONFIG": {
"hosts": ['redis://{}:{}'.format(REDIS_OPTIONS['HOST'],
REDIS_OPTIONS['PORT'])]
},
"ROUTING": "livelog.routing.channel_routing"
}
}
注意到使用了 Redis 来做为 Channels 的 Backend。使用 Redis 是为了跨进程的消息处理,为了简便(不需要跨进程),也可以使用 In Memory Channel Layer。接着,我们在 livelog app 目录下添加一个 routing.py 文件,用以配置 channel 路由,现在可以留空:
channel_routing =
现在,通过以下命令即可跑起来我们的第一个 ASGI Server。
# run following commands in command line separately
redis-server /usr/local/etc/redis.conf
python manage.py runserver
可以看到和普通的 Django app server 的启动方式没什么两样,这归功于 Django Channels 封装。实际工作中,要拆解为三个单独的启动步骤:
daphne livelog.asgi:channel_layer --port 8080//daphne 是 asgi interface sever 的一种实现
python manage.py runworker // 启动 ASGI consumer worker
python manage.py runserver --noasgi --noworker // 启动原始的 WSGI server,不运行 ASGI interface server、worker
第二步:处理 WebSocket
Channels 将 WebSocket 连接映射到三个不同到频道:
当一个新的客户端通过 WebSocket 连接时,
websocket.connect
频道将收到一条消息,在本例中,将在此频道将新用户添加到实时博客订阅组里。当一个客户端断开连接,
websocket.disconnect
频道将收到一条消息。每一条消息都将被发往
websocket.receive
频道,在这个频道里。
首先,我们需要将我们的处理逻辑绑定到这三个频道。
在 livelog app 目录中,添加 comsumers.py 文件:
...
defws_connect(message):
message.reply_channel.send({'accept': True})
Group(const.GROUP_NAME).add(message.reply_channel)
defws_disconnect(message):
Group(const.GROUP_NAME).discard(message.reply_channel)
defws_receive(message):
pass
在本例中,使用 WSGI 也即原生 Django 提供 http 服务。在 routing.py 文件中,将逻辑绑定到路由:
...
channel_routing = [
route('websocket.connect', ws_connect),
route('websocket.disconnect', ws_disconnect),
route('websocket.receive', ws_receive)
]
在 ws_connect
频道中,做了两件事:
建立 WebSocket 连接;
将新用户添加到群组里,后面我们将该群组作为实时博客的订阅组,对它发送新的消息。
在 ws_disconnect
频道中,将断开连接的用户移出群组。在ws_receive
频道中,暂时什么也不做,因为本例中,我们不通过被动接受新消息的方式来更新博客。为了较为全面地展示 ASGI 的能力,本例中,我们使用 Background worker(后台进程)来主动更新博客,并给群组用户推送新消息。
第三步:推送消息
更新实时博客的方式有很多种,在这里我们使用一个 RSS Feed 作为内容源,对该内容源定时抓取,将新内容更新到博客中,以此来做一个演示。在此之前,介绍一个 Django 的小特性,这个特性将允许我们这么运行我们自己的后台进程:
python manage.py bloggingworker // bloggingworker 是我们自定义的命令
这个特新就是 Django 的自定义命令
具体做法在此不赘述,看文档或看本例完整源码即可得知。下面我们回到正事儿上:
blogging_worker.py
# -*- coding: utf-8 -*-
...
classCommand(BaseCommand):
"""
Command to start blogging worker from command line.
"""
help = 'Fetch and parse RSS feed and send over channel'
defhandle(self, *args, **options):
rc = redis.Redis(host=settings.REDIS_OPTIONS['HOST'],
port=settings.REDIS_OPTIONS['PORT'],
db=settings.REDIS_OPTIONS['DB'])
rc.delete(const.GROUP_NAME) # flush live blogs
whileTrue:
feed = feedparser.parse(const.IFANR_FEED_URL)
forentryinfeed.get('entries')[::-1]:
ifnotrc.hexists(const.GROUP_NAME, entry.get('id')):
Group(const.GROUP_NAME).send({'text': json.dumps(entry)})
rc.hset(const.GROUP_NAME, entry.get('id'), json.dumps(entry))
logger.debug('send a message %s ' % entry.get('title'))
time.sleep(5)
这个文件里的大量写法都是 Django 约束的,所以不必过分追究,我们看 handle 方法,在这里,我们做这么几件事:
从爱范儿的 RSS Feed 获取最新的文章,对每一文章,检查这篇文章是否已经发送过,如果没有发送过,则将该文章发送给群组;
在 redis 中,新建一个名为 liveblog(这个常量存储在 const.GROUP_NAME) 的哈希表,将每篇未被发送过的新文章添加到该哈希表中,这样就可以判断一篇文章是否曾经被发送过;
Group(const.GROUP_NAME).send({'text':json.dumps(entry)})
对群组进行新消息的发送;每次发送之后,休息 5 秒钟。
第四步:接收新消息并渲染
我们已经有了一个永不停歇的博客更新服务在运行着,一旦有新的内容,这个服务将为用户推送。现在,我们需要一个页面来渲染呈现新消息。
在 templates 目录下,新建 blog.html 文件:
<!DOCTYPE html>
<htmllang="en">
...
<divid="container">
<h2>Live Blog</h2>
<ulid="log">
</ul>
</div>
<scripttype="application/javascript">
varws_scheme = window.location.protocol == "https:" ? "wss" : "ws";
varws =newWebSocket(ws_scheme + '://' + window.location.host + window.location.pathname);
console.log(ws);
ws.onmessage = function(message) {
vardata = JSON.parse(message.data);
varlogList = document.querySelector('#log');
varlogItem = document.createElement('li');
varitemTmp = `
<h3>${data.title}</h3>
<p>
<date>${newDate(data.published).toLocaleString}</date>
<span>${data.source.title}</span>
</p>
`;
logItem.innerHTML = itemTmp;
logList.insertBefore(logItem, logList.firstChild);
}
</script>
</body>
</html>
在这个页面里,有一个 WebSocket Client 将被新建,并向服务器发起建立连接的请求。当有新消息,将在 #log
ul 中插入新的消息。接下来就是传统 Django 的工作:
在 urls.py 中配置 django view 的路由:
url(r'blog/