Django Channels介绍
首先要理解Django现有的请求响应策略是这样的:浏览器发出请求,Django服务器接受请求后通过路由匹配该请求到某个视图,视图将会返回一个响应并由服务器发送回浏览器。类似的请求响应在Flask实现也是如此。对于一般性的网页浏览(比如新闻阅读),这样的响应机制是没有问题的,但对于需要一个保持不断会话的请求来说,这是行不通的,因为Django的声明周期只能存在一个请求中,它不能让服务器在没有请求的情况下不断地发送数据岛浏览器客服端。这样的场景目前正在不断地涌现,例如在线聊天室,会话机器人,以及最近很流行的微服务应用。
Channels改变了Django的工作方式,让它实现了一种包括通道、消费者和worker的worker监听的模式,所有消费者都会分配有单独的通道,worker监听通道的消息,确保消息到来时能进行处理。为了确保上述机制运行,Channels需要有三个工作层:
- 接口服务器,Django和用户(浏览器)之间通信的桥梁,包括一个实现WSGI协议的适配器和一个独立的websocket服务器。
- 通道后端, 在接口服务器和worker之间传递消息,由插拔式的python代码和存储组成,存储可以是内存、数据库或者redis,推荐使用redis,兼具其余两者的优点。
- worker,监听所有channel,当有新消息到来时候唤醒功能函数。
Channels可以让Django的框架变得更为可靠和可拓展,整个通信的服务器数可以按需拓展,至少保证一台协议服务器和一台工作服务器即可。使用Channels后,你不再需要组织code去为异步调用,Channls已经将一切都已经帮你准备好。
参考实例:https://www.cnblogs.com/kendrick/p/7218107.html
实验教程
- 开发 Windows10 / 生产Centos7
- Python3.7
- pyCharm2020
- 前端框架: https://www.layui.com/layim/
- redis 3.0.53 Windows x64
本实验的目的是搭建一个用于聊天机器人的WEB交互框架,可以直接拉到最下方看实现效果。
下面的代码运行需要redis服务开启了6379端口正常运行。
演示实例:
用户名:user001
密码:p@ssw0rdwcx
客户端使用谷歌浏览器打开: https://www.szyfd.xyz/itkf/app/index/
用户名:kefu001
密码:p@ssw0rdwcx
服务端使用IE浏览器打开:https://www.szyfd.xyz/itkf/app/index/
前端使用: https://www.layui.com/doc/modules/layim.html
后端版本:python3 + django
运行效果图:
项目目录:
1.pycharm 新建django 项目
2.安装 pip install -r requirements.txt
aioredis==1.2.0 asgiref==3.2.7 asn1crypto==0.24.0 async-timeout==3.0.1 attrs==19.1.0 autobahn==19.9.2 Automat==0.7.0 backports.csv==1.0.7 certifi==2019.6.16 cffi==1.12.3 channels==2.2.0 channels-redis==2.3.3 chardet==3.0.4 constantly==15.1.0 cryptography==2.7 daphne==2.3.0 defusedxml==0.6.0 diff-match-patch==20181111 Django==2.1.11 django-import-export==1.2.0 django-redis==4.10.0 django-simpleui==4.0 django-utils==0.0.2 et-xmlfile==1.0.1 hiredis==1.0.0 hyperlink==19.0.0 idna==2.8 incremental==17.5.0 jdcal==1.4.1 lark-parser==0.7.4 msgpack==0.6.1 numpy==1.17.1 odfpy==1.4.0 openpyxl==2.6.3 optionaldict==0.1.1 Pillow==7.1.2 pycparser==2.19 PyHamcrest==1.9.0 PyMySQL==0.9.3 python-dateutil==2.8.0 pytz==2019.2 PyYAML==5.1.2 redis==3.3.8 requests==2.22.0 required==0.4.0 six==1.12.0 sqlparse==0.3.0 tablib==0.13.0 Twisted==19.7.0 txaio==18.8.1 urllib3==1.25.3 wechatpy==1.8.3 xlrd==1.2.0 xlwt==1.3.0 xmltodict==0.12.0 zope.interface==4.6.0
3.新建 routing.py
from channels.auth import AuthMiddlewareStack from channels.routing import ProtocolTypeRouter, URLRouter import app.routing application = ProtocolTypeRouter({ 'websocket': AuthMiddlewareStack( URLRouter( app.routing.websocket_urlpatterns ) ), })
4.配置 settings.py
""" Django settings for itkf project. Generated by 'django-admin startproject' using Django 3.0.5. For more information on this file, see https://docs.djangoproject.com/en/3.0/topics/settings/ For the full list of settings and their values, see https://docs.djangoproject.com/en/3.0/ref/settings/ """ import os # Build paths inside the project like this: os.path.join(BASE_DIR, ...) BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # Quick-start development settings - unsuitable for production # See https://docs.djangoproject.com/en/3.0/howto/deployment/checklist/ # SECURITY WARNING: keep the secret key used in production secret! SECRET_KEY = '&avamfpy-nj-9q#91nn89^(zjl0s-&iu3*+g+strp&qjxqwerh' # SECURITY WARNING: don't run with debug turned on in production! DEBUG = True ALLOWED_HOSTS = ["*"] # Application definition INSTALLED_APPS = [ 'simpleui', 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'app.apps.AppConfig', 'import_export', 'channels', ] MIDDLEWARE = [ 'django.middleware.security.SecurityMiddleware', 'django.contrib.sessions.middleware.SessionMiddleware', 'django.middleware.common.CommonMiddleware', 'django.middleware.csrf.CsrfViewMiddleware', 'django.contrib.auth.middleware.AuthenticationMiddleware', 'django.contrib.messages.middleware.MessageMiddleware', 'django.middleware.clickjacking.XFrameOptionsMiddleware', ] ROOT_URLCONF = 'itkf.urls' TEMPLATES = [ { 'BACKEND': 'django.template.backends.django.DjangoTemplates', 'DIRS': [os.path.join(BASE_DIR, 'templates')] , 'APP_DIRS': True, 'OPTIONS': { 'context_processors': [ 'django.template.context_processors.debug', 'django.template.context_processors.request', 'django.contrib.auth.context_processors.auth', 'django.contrib.messages.context_processors.messages', ], }, }, ] ASGI_APPLICATION = 'itkf.routing.application' WSGI_APPLICATION = 'itkf.wsgi.application' # Database # https://docs.djangoproject.com/en/3.0/ref/settings/#databases # redis配置 CACHES = { "default": { "BACKEND": "django_redis.cache.RedisCache", "LOCATION": "redis://127.0.0.1:6379/", "OPTIONS": { "CLIENT_CLASS": "django_redis.client.DefaultClient", "CONNECTION_POOL_KWARGS": {"max_connections": 100}, "COMPRESSOR": "django_redis.compressors.zlib.ZlibCompressor", # "PASSWORD": "密码", } } } CHANNEL_LAYERS = { 'default': { 'BACKEND': 'channels_redis.core.RedisChannelLayer', 'CONFIG': { "hosts": [('127.0.0.1', 6379)], }, }, } DATABASES = { 'default': { 'ENGINE': 'django.db.backends.mysql', 'HOST': '127.0.0.1', 'PORT': '3306', 'NAME': 'itkf', 'USER': 'root', 'PASSWORD': '123456' } } # 开发redis 路径 C:\Program Files\Redis redis-server redis.windows.conf ''' windows下安装Redis第一次启动报错: [2368] 21 Apr 02:57:05.611 # Creating Server TCP listening socket 127.0.0.1:6379: bind: No error 解决方法:在命令行中运行 redis-cli.exe 127.0.0.1:6379>shutdown not connected>exit 然后重新运行redis-server.exe redis.windows.conf ''' # Password validation # https://docs.djangoproject.com/en/3.0/ref/settings/#auth-password-validators AUTH_PASSWORD_VALIDATORS = [ { 'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator', }, { 'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator', }, { 'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator', }, { 'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator', }, ] # Internationalization # https://docs.djangoproject.com/en/3.0/topics/i18n/ LANGUAGE_CODE = 'zh-hans' TIME_ZONE = 'Asia/Shanghai' USE_I18N = True USE_L10N = True USE_TZ = False # Static files (CSS, JavaScript, Images) # https://docs.djangoproject.com/en/3.0/howto/static-files/ domain = "http://127.0.0.1:8000" # 图片上传路径 MEDIA_URL = '/' MEDIA_ROOT = r'D:/itkf/itkfstatic/uploadImage/' STATIC_URL = '/itkfstatic/' SIMPLEUI_HOME_INFO = False # SIMPLEUI 配置 SIMPLEUI_STATIC_OFFLINE = True STATICFILES_DIRS = ( os.path.join(BASE_DIR, 'itkfstatic'), ) # 登录页面 LOGIN_URL = '/itkf/admin/login/' # 权限缓存配置 SESSION_ENGINE = 'django.contrib.sessions.backends.db' # 引擎(默认) SESSION_COOKIE_NAME = "sessionid" # Session的cookie保存在浏览器上时的key,即:sessionid=随机字符串(默认) SESSION_COOKIE_PATH = "/" # Session的cookie保存的路径(默认) SESSION_COOKIE_DOMAIN = None # Session的cookie保存的域名(默认) SESSION_COOKIE_SECURE = False # 是否Https传输cookie(默认) SESSION_COOKIE_HTTPONLY = True # 是否Session的cookie只支持http传输(默认) SESSION_COOKIE_AGE = 1209600 # Session的cookie失效日期(2周)(默认) SESSION_EXPIRE_AT_BROWSER_CLOSE = True # 是否关闭浏览器使得Session过期(默认) SESSION_SAVE_EVERY_REQUEST = False # 是否每次请求都保存Session,默认修改之后才保存(默认) weChatWork = { 'corpid': "", 'secret': "", 'sourceFile': "static/source", 'serviceUser_': 'serviceUser_', 'customeUser_': 'customeUser_', "media_image_url": "/itkfstatic/uploadImage/", "avatar_image_url": "/itkfstatic/avatar/" }
5.urls.py 配置
from django.contrib import admin from django.urls import path, include urlpatterns = [ path('itkf/admin/', admin.site.urls), path('itkf/app/', include("app.urls")), ]
6. 项目名称下 >> __init__.py 文件配置
import pymysql #pymysql.version_info = (1, 3, 13, "final", 0) pymysql.install_as_MySQLdb()
7.应用名称(app) >> models.py
from django.contrib.auth.models import User from django.db import models # Create your models here. from django.utils.html import format_html from django.db import models import datetime import uuid from django.db import models from django.contrib.auth.models import User # Create your models here. from django.utils.html import format_html from django.db.models import IntegerField, Model from django.core.validators import MaxValueValidator, MinValueValidator import datetime import random, os # Create your models here. from django.contrib.auth.models import AbstractUser from django.db import models ENV_PROFILE = os.getenv("ENV") if ENV_PROFILE == "test": import itkf.test_settings as config elif ENV_PROFILE == "production": import itkf.prd_settings as config else: import itkf.settings as config corpid = config.weChatWork["corpid"] sourceFile = config.weChatWork["sourceFile"] media_image_url = config.weChatWork["media_image_url"] def rename(newname): def decorator(fn): fn.__name__ = newname return fn return decorator def newImageName(instance, filename): filename = '{}.{}'.format(uuid.uuid4().hex, "png") return filename # 生成预约订单号 # 用时间生成一个唯一随机数 def random_with_N_digits(n): range_start = 10 ** (n - 1) range_end = (10 ** n) - 1 return random.randint(range_start, range_end) def get_ran_dom(): nowTime = datetime.datetime.now().strftime("%Y%m%d%H%M%S") # 生成当前时间 randomNum = random_with_N_digits(3) # 生成的随机整数n,其中0<=n<=100 if randomNum <= 10: randomNum = str(0) + str(randomNum) uniqueNum = str(nowTime) + str(randomNum) return uniqueNum # 应用管理 class agent(models.Model): name = models.CharField(max_length=225, verbose_name="部门名称", blank=True, default="") agentid = models.CharField(max_length=225, verbose_name="应用ID", blank=True, default="") secret = models.CharField(max_length=225, verbose_name="应用密钥", blank=True, default="") avatar = models.ImageField(max_length=225, verbose_name="部门Logo", blank=True, default="") conversationTime = models.IntegerField(verbose_name="会话时长(分钟)", default=20) webhook_url = models.URLField(verbose_name="群机器人地址", default="", blank=True, null=True) createTime = models.DateTimeField(auto_now_add=True, verbose_name="创建时间") lastTime = models.DateTimeField(auto_now=True, verbose_name="修改时间") author = models.ForeignKey(User, null=True, blank=True, on_delete=models.CASCADE, verbose_name="创建者", related_name="agent_author") editor = models.ForeignKey(User, null=True, blank=True, on_delete=models.CASCADE, verbose_name="修改者", related_name="agent_creator") @rename("部门Logo") def showAvatar(self): return format_html("<img src='{}{}' style='width: 60px;height: 60px;' class='showAvatar' />", media_image_url, self.avatar) @rename("详情") def checkMessage(self): return format_html("<a href='/app/index/{}.html' target='blank'>回复</a>", self.id) class Meta: verbose_name = verbose_name_plural = '部门管理' ordering = ['id'] def __str__(self): return self.name # 客服人员 class KF(models.Model): agent = models.ForeignKey(agent, null=True, on_delete=models.CASCADE, verbose_name="应用名称") username = models.CharField(max_length=225, verbose_name="姓名", blank=True, default="") userid = models.CharField(max_length=225, verbose_name="UM", blank=True, default="") status = models.BooleanField(verbose_name="是否在线", default=False) avatar = models.ImageField(max_length=225, verbose_name="头像", blank=True, default="") createTime = models.DateTimeField(auto_now_add=True, verbose_name="创建时间") lastTime = models.DateTimeField(auto_now=True, verbose_name="修改时间") author = models.ForeignKey(User, null=True, blank=True, on_delete=models.CASCADE, verbose_name="创建者", related_name="kf_author") editor = models.ForeignKey(User, null=True, blank=True, on_delete=models.CASCADE, verbose_name="修改者", related_name="kf_creator") class Meta: verbose_name = verbose_name_plural = '在线客服' ordering = ['id'] @rename("头像") def showAvatar(self): return format_html("<img src='{}{}' style='width: 60px;height: 60px;' class='showAvatar' />", media_image_url, self.avatar) def __str__(self): return self.username # 行内员工 def randomSign(): switch = { 0: "只要还有明天,今天就永远是起跑线。", 1: "只要还有明天,今天就永远是起跑线。", 2: "只要还有明天,今天就永远是起跑线。" } return switch[0] class userList(models.Model): agent = models.ForeignKey(agent, null=True, on_delete=models.CASCADE, verbose_name="应用名称") username = models.CharField(max_length=225, verbose_name="姓名", blank=True, default="") userid = models.CharField(max_length=225, verbose_name="UM", blank=True, default="") avatar = models.ImageField(max_length=225, verbose_name="头像", blank=True, default="") sign = models.CharField(max_length=225, verbose_name="个性签名", blank=True, default=randomSign) ISLEAD_CHOICES = ((0, '是'), (1, '否'),) islead = models.IntegerField(choices=ISLEAD_CHOICES, verbose_name="等级", default=1) createTime = models.DateTimeField(auto_now_add=True, verbose_name="创建时间") lastTime = models.DateTimeField(auto_now=True, verbose_name="修改时间") author = models.ForeignKey(User, null=True, blank=True, on_delete=models.CASCADE, verbose_name="创建者", related_name="userlist_author") editor = models.ForeignKey(User, null=True, blank=True, on_delete=models.CASCADE, verbose_name="修改者", related_name="userlist_creator") @rename("头像") def showAvatar(self): return format_html("<img src='{}{}' style='width: 60px;height: 60px;' class='showAvatar' />", media_image_url, self.avatar) class Meta: verbose_name = verbose_name_plural = '用户列表' ordering = ['id'] def __str__(self): return self.username # 接受的消息 class Message(models.Model): ToUserName = models.CharField(max_length=225, verbose_name="接受者", blank=True, default="") FromUserName = models.CharField(max_length=225, verbose_name="发送者", blank=True, default="") CreateTime = models.DateTimeField(verbose_name="发送时间", blank=True, default=None) MsgId = models.CharField(max_length=225, verbose_name="消息ID", blank=True, default="") AgentID = models.CharField(max_length=225, verbose_name="部门名称", blank=True, default="") MsgType = models.CharField(max_length=225, verbose_name="消息类型", blank=True, default="") content = models.TextField(max_length=2000, verbose_name="消息内容", blank=True, default="") userList = models.ForeignKey('userList', null=True, to_field="id", on_delete=models.CASCADE) createDateTime = models.DateTimeField(auto_now_add=True, verbose_name="创建时间") lastTime = models.DateTimeField(auto_now=True, verbose_name="修改时间") author = models.ForeignKey(User, null=True, blank=True, on_delete=models.CASCADE, verbose_name="创建者", related_name="message_author") editor = models.ForeignKey(User, null=True, blank=True, on_delete=models.CASCADE, verbose_name="修改者", related_name="message_creator") class Meta: verbose_name = verbose_name_plural = '所有消息' ordering = ['id'] def __str__(self): return self.FromUserName # 员工服务 class staffService(models.Model): agent = models.ForeignKey('agent', null=True, on_delete=models.CASCADE, verbose_name="应用名称") title = models.CharField(max_length=225, verbose_name="标题", blank=True, default="") avatar = models.ImageField(max_length=225, verbose_name="头像", blank=True, default="") desc = models.TextField(max_length=500, verbose_name="描述", default="", blank=True, null=True) welcomeText = models.TextField(max_length=2000, verbose_name="欢迎语", blank=True, default="") firstText = models.TextField(max_length=2000, verbose_name="会话提示语", blank=True, default="您好,很高兴为您服务!") notuserText = models.TextField(max_length=2000, verbose_name="客服不在线提示语", blank=True, default="非常抱歉,客服处于离线状态,您的消息我们已发送IT服务台,马上会有IT同事跟进处理!") createTime = models.DateTimeField(auto_now_add=True, verbose_name="创建时间") lastTime = models.DateTimeField(auto_now=True, verbose_name="修改时间") author = models.ForeignKey(User, null=True, blank=True, on_delete=models.CASCADE, verbose_name="创建者", related_name="staffService_author") editor = models.ForeignKey(User, null=True, blank=True, on_delete=models.CASCADE, verbose_name="修改者", related_name="staffService_creator") class Meta: verbose_name = verbose_name_plural = '员工服务' ordering = ['createTime'] @rename("头像") def showAvatar(self): return format_html("<img src='{}{}' style='width: 60px;height: 60px;' class='showAvatar' />", media_image_url, self.avatar) def __str__(self): return self.title class knowledgeBase(models.Model): questionType = models.CharField(max_length=225, verbose_name="问题类型", blank=True, default="") key = models.CharField(max_length=225, verbose_name="关键字", blank=True, default="") rule = models.IntegerField(choices=((0, '包含'), (1, '完全匹配')), default=0, verbose_name='规则') answerType = models.IntegerField(choices=((0, '文字'), (1, '图文'), (2, '图片'), (3, '语音'), (4, '视频')), default=0, verbose_name='发送类型') content = models.TextField(max_length=2000, verbose_name="消息内容", blank=True, default="") createTime = models.DateTimeField(auto_now_add=True, verbose_name="创建时间") lastTime = models.DateTimeField(auto_now=True, verbose_name="修改时间") author = models.ForeignKey(User, null=True, blank=True, on_delete=models.CASCADE, verbose_name="创建者", related_name="knowledgeBase_author") editor = models.ForeignKey(User, null=True, blank=True, on_delete=models.CASCADE, verbose_name="修改者", related_name="knowledgeBase_creator") class Meta: verbose_name = verbose_name_plural = '知识库' ordering = ['id'] def __str__(self): return self.content
8.应用名称(app) >> admin.py
from django.contrib import admin # Register your models here. from import_export import resources from import_export.admin import ImportExportModelAdmin from wechatpy.enterprise import WeChatClient from wechatpy.enterprise.crypto import WeChatCrypto from wechatpy.exceptions import InvalidSignatureException import itkf as config import os from app import models admin.site.site_title = "企业号后台管理" admin.site.site_header = "企业号后台管理" # 企业号ID ENV_PROFILE = os.getenv("ENV") if ENV_PROFILE == "test": import itkf.test_settings as config elif ENV_PROFILE == "production": import itkf.prd_settings as config else: import itkf.settings as config class agentResource(resources.ModelResource): def get_export_headers(self): # 是你想要的导出头部标题headers return ['应用名称', '欢迎语', '会话提示语', '部门Logo', '创建时间', '修改时间', '创建者', '修改者'] class Meta: field = ('name', 'welcomeText', 'firstText', 'avatar', 'createTime', 'lastTime', 'author', 'editor') model = models.agent fields = field export_order = field @admin.register(models.agent) class agentAdmin(ImportExportModelAdmin): fields = ( 'name', 'avatar', 'agentid', 'secret', 'webhook_url', 'conversationTime') # 需要显示的字段信息 list_display = ('showAvatar', 'name', 'webhook_url', 'conversationTime', 'createTime', 'lastTime', 'author', 'editor', 'checkMessage') exclude = ('author', 'editor') # 设置哪些字段可以点击进入编辑界面,默认是第一个字段 list_display_links = ('showAvatar', 'name',) model_icon = "fa fa-tag" list_per_page = 10 resource_class = agentResource def save_model(self, request, obj, form, change): if form.is_valid(): if not change: obj.author = request.user obj.editor = request.user obj.save() super().save_model(request, obj, form, change) class KFResource(resources.ModelResource): def get_export_headers(self): # 是你想要的导出头部标题headers return ['姓名', 'UM', '头像'] class Meta: field = ('username', 'userid', 'sign', 'avatar',) model = models.KF fields = field export_order = field @admin.register(models.KF) class KFAdmin(ImportExportModelAdmin): fields = ("agent", "avatar", "username", 'userid') # 需要显示的字段信息 list_display = ("agent", "showAvatar", 'username', 'userid', 'status', 'createTime', 'lastTime', 'author', 'editor') exclude = ('status',) # 设置哪些字段可以点击进入编辑界面,默认是第一个字段 list_display_links = ('username',) model_icon = "fa fa-tag" list_per_page = 10 resource_class = KFResource def save_model(self, request, obj, form, change): if form.is_valid(): if not change: obj.author = request.user obj.editor = request.user obj.save() super().save_model(request, obj, form, change) class userListResource(resources.ModelResource): def get_export_headers(self): # 是你想要的导出头部标题headers return ['姓名', 'UM', '头像'] class Meta: field = ('username', 'userid', 'avatar', 'createTime', 'lastTime', 'author', 'editor') model = models.userList fields = field export_order = field @admin.register(models.userList) class userListAdmin(ImportExportModelAdmin): fields = ('avatar', 'username', 'userid', 'islead',) # 需要显示的字段信息 list_display = ('showAvatar', 'username', 'userid', 'islead', 'createTime', 'lastTime', 'author', 'editor') # 设置哪些字段可以点击进入编辑界面,默认是第一个字段 list_display_links = ('username',) search_fields = ('username', 'userid') model_icon = "fa fa-tag" list_per_page = 50 resource_class = userListResource def save_model(self, request, obj, form, change): if form.is_valid(): if not change: obj.author = request.user obj.editor = request.user obj.save() super().save_model(request, obj, form, change) class MessageResource(resources.ModelResource): def get_export_headers(self): # 是你想要的导出头部标题headers return ['企业号ID', '发送者', '发送时间', '消息ID', '应用ID', '消息类型', '消息内容'] class Meta: field = ('FromUserName', 'CreateTime', 'MsgId', 'AgentID', 'MsgType', 'content') model = models.Message fields = field export_order = field # Register your models here. @admin.register(models.Message) class MessageAdmin(ImportExportModelAdmin): fields = ('FromUserName', 'CreateTime', 'MsgId', 'AgentID', 'MsgType', 'content') # 需要显示的字段信息 list_display = ( 'id', 'ToUserName', 'FromUserName', 'CreateTime', 'MsgId', 'AgentID', 'MsgType', 'content', 'author') # 设置哪些字段可以点击进入编辑界面,默认是第一个字段 list_display_links = ('id', 'FromUserName') model_icon = "fa fa-tag" list_per_page = 10 resource_class = MessageResource class knowledgeBaseResource(resources.ModelResource): def get_export_headers(self): # 是你想要的导出头部标题headers return ['问题类型', '关键字', '规则', '发送类型', '消息内容', '创建时间', '修改时间', '创建者', '修改者'] class Meta: field = ('questionType', 'key', 'rule', 'answerType', 'content', 'createTime', 'lastTime', 'author', 'editor') model = models.knowledgeBase fields = field export_order = field # Register your models here. @admin.register(models.knowledgeBase) class knowledgeBaseAdmin(ImportExportModelAdmin): fields = ('questionType', 'key', 'rule', 'answerType', 'content') # 需要显示的字段信息 list_display = ('id', 'questionType', 'key', 'rule', 'answerType', 'content') # 设置哪些字段可以点击进入编辑界面,默认是第一个字段 list_display_links = ('id', 'questionType') model_icon = "fa fa-tag" list_per_page = 10 resource_class = knowledgeBaseResource def save_model(self, request, obj, form, change): if form.is_valid(): if not change: obj.author = request.user obj.editor = request.user obj.save() super().save_model(request, obj, form, change) class staffServiceResource(resources.ModelResource): class Meta: field = ( 'agent', 'title', 'avatar', 'welcomeText', 'firstText', 'notuserText', "desc", 'createTime', 'lastTime', 'author', 'editor') model = models.staffService fields = field export_order = field # Register your models here. @admin.register(models.staffService) class staffServiceBaseAdmin(ImportExportModelAdmin): fields = ('agent', 'title', 'avatar', "desc", 'welcomeText', 'firstText', 'notuserText',) # 需要显示的字段信息 list_display = ( 'showAvatar', 'agent', 'title', "desc", 'welcomeText', 'firstText', 'notuserText', 'createTime', 'lastTime', 'author', 'editor') # 设置哪些字段可以点击进入编辑界面,默认是第一个字段 list_display_links = ('agent', 'title',) model_icon = "fa fa-tag" list_per_page = 10 resource_class = staffServiceResource def save_model(self, request, obj, form, change): if form.is_valid(): if not change: obj.author = request.user obj.editor = request.user obj.save() super().save_model(request, obj, form, change)
9.应用名称(app) >> consumers.py
import time from channels.generic.websocket import WebsocketConsumer from channels.generic.websocket import AsyncWebsocketConsumer import json import numpy as np from app import models from django.core.cache import cache from asgiref.sync import async_to_sync import itkf as config from django.utils.safestring import mark_safe import os ENV_PROFILE = os.getenv("ENV") if ENV_PROFILE == "test": import itkf.test_settings as config elif ENV_PROFILE == "production": import itkf.prd_settings as config else: import itkf.settings as config corpid = config.weChatWork["corpid"] sourceFile = config.weChatWork["sourceFile"] serviceUser = config.weChatWork["serviceUser_"] customeUser = config.weChatWork["customeUser_"] media_image_url = config.weChatWork["media_image_url"] # 根据类型选择发送对应的格式 def sendContent(item): MsgType = item["MsgType"] content = item["content"] if MsgType == "image": content = mark_safe("img[{0}]".format(content)) if MsgType == "video": content = mark_safe("video[{0}]".format(content)) if MsgType == "voice": content = mark_safe("audio[{0}]".format(content)) return {'username': item['username'], 'avatar': item['avatar'], 'id': item['id'], 'type': 'friend', 'content': content} class ChatConsumer(AsyncWebsocketConsumer): async def connect(self): print("connect") groupName = "" self.user = self.scope["user"] # 客服上线 username = self.scope["user"].username print("username===========", username) agentid = 1 staff = models.staffService.objects.filter(agent__id=agentid) kfUser = models.KF.objects.filter(userid=username).first() if kfUser: groupName = serviceUser + str(kfUser.agent.id) + username kfUser.status = True kfUser.save() else: groupName = customeUser + username self.room_group_name = groupName await self.channel_layer.group_add( self.room_group_name, self.channel_name ) await self.accept() await self.receive("once") async def disconnect(self, close_code): print("disconnect") print(close_code) print(self.scope["user"]) # Leave room group await self.channel_layer.group_discard( self.room_group_name, self.channel_name ) # 客服下线 kfUser = models.KF.objects.filter(userid=self.scope["user"].username).update(status=False) print(kfUser) async def receive(self, text_data): self.user = self.scope["user"] # Send message to room group # loginKF = models.KF.objects.filter(userid=self.user).first() AgentID = "1" redisMessage = cache.iter_keys(AgentID + "$*") for msgg in redisMessage: currentMsg = cache.get(msgg) for item in currentMsg: message = sendContent(item) await self.channel_layer.group_send( self.room_group_name, { 'type': 'chat_message', 'message': message } ) newRedisMessage = cache.get(msgg) if currentMsg == newRedisMessage: cache.delete(msgg) async def chat_message(self, message): print(message) # Send message to WebSocket await self.send(text_data=json.dumps(message))
10.应用名称(app) >> routing.py
from django.urls import path from app.consumers import ChatConsumer websocket_urlpatterns = [ path('ws/chat/', ChatConsumer), ]
11.应用名称(app) >> urls.py
from django.contrib import admin from django.urls import path from app import views urlpatterns = [ # path('login/', views.login), # 授权登录 path('requestInfo/', views.requestInfo), # 请求信息 path('index/', views.index), # PC客服主页 path('custome/', views.custome), # PC客户主页 path('GetUserList/', views.GetUserList), # 用户信息 path('departmentServices/', views.departmentServices), # 用户信息 path('uploadImage/', views.uploadImage), # 上传图片 path('getWelcomeText/', views.getWelcomeText), # 上传图片 # 更新客服状态在线/下线 path('updateUserStatus/', views.updateUserStatus), # 上传图片 ]
12.应用名称(app) >> views.py
import wechatpy from django.contrib import auth from wechatpy.enterprise.client.api import WeChatOAuth from wechatpy.enterprise.exceptions import InvalidCorpIdException from wechatpy import enterprise, parse_message from django.shortcuts import render, redirect from django.http import JsonResponse, HttpResponse, HttpResponseRedirect from django.contrib.auth import authenticate, login, logout from django.contrib.auth.decorators import login_required from django.contrib.auth.forms import UserCreationForm from django.contrib.auth.models import User import uuid, datetime, json, time import itkf as config from django.utils.safestring import mark_safe from wechatpy.enterprise.crypto import WeChatCrypto from wechatpy.exceptions import InvalidSignatureException import os import urllib import itkf from app import models from django.core.cache import cache from wechatpy.enterprise import WeChatClient from wechatpy.session.redisstorage import RedisStorage from redis import Redis from urllib.parse import quote from django.utils.safestring import mark_safe import json import urllib3 ENV_PROFILE = os.getenv("ENV") if ENV_PROFILE == "test": import itkf.test_settings as config elif ENV_PROFILE == "production": import itkf.prd_settings as config else: import itkf.settings as config corpid = config.weChatWork["corpid"] sourceFile = config.weChatWork["sourceFile"] serviceUser = config.weChatWork["serviceUser_"] customeUser = config.weChatWork["customeUser_"] media_image_url = config.weChatWork["media_image_url"] avatar_image_url = config.weChatWork["avatar_image_url"] domain = config.domain # 群消息提问内容 def template_string(**kwargs): return """<font color="warning">新消息</font> > 姓名:{username} > UM:{um} > 发送内容:{content} > 点击查看:{url} """.format(**kwargs) # 查看请求信息 def requestInfo(request): result = request.environ.items() return render(request, 'requestInfo.html', {'rinfo': result}) # 登录功能 # 注销 def logout(request): kfUser = models.KF.objects.filter(userid=request.user.username).first() kfUser.status = False kfUser.save() auth.logout(request) return redirect('/admin/login/') # 注册 def registered(request, userid): user = User.objects.filter(username=userid).first() if not user: user = User.objects.create_user(username=userid, email=str(userid) + "@pingan.com.cn", password=uuid.uuid1(), is_staff=True, is_active=True) auth.login(request, user) return user # Create your tests here. @login_required def index(request): method = request.method.upper() agentid = request.GET.get("nid", 1) if method == "GET": return render(request, "index.html") elif method == "POST": UserId = request.user.username kfUser = models.KF.objects.filter(userid=UserId).first() kfUser.status = True kfUser.save() print(request.POST) Content = request.POST.get('mine[content]') userid = request.POST.get('mine[id]') FromUserName = request.POST.get('to[id]') cur_time = datetime.datetime.now() models.Message.objects.create(ToUserName=userid, AgentID=kfUser.agent, FromUserName=FromUserName, content=Content, CreateTime=cur_time, author=request.user, editor=request.user) from channels.layers import get_channel_layer channel_layer = get_channel_layer() from asgiref.sync import async_to_sync print("FromUserName============", FromUserName) obj = { "username": request.user.first_name , "avatar": media_image_url + str(kfUser.avatar) , "id": FromUserName , "type": "chat_message" , "content": Content } print("rindex===========", FromUserName[str(FromUserName).rindex("_") + 1:]) print("customeUser + FromUserName=============", customeUser + FromUserName) async_to_sync(channel_layer.group_send)(customeUser + FromUserName[str(FromUserName).rindex("_") + 1:], obj) obj = { "username": request.user.first_name , "avatar": media_image_url + str(kfUser.avatar) , "id": FromUserName , "type": "chat_message" , "content": Content } currentOnlineKF = models.KF.objects.filter(agent__id=agentid, status=True).exclude(userid=UserId) print(currentOnlineKF.query) for item in currentOnlineKF: async_to_sync(channel_layer.group_send)(serviceUser + str(agentid) + item.userid, obj) result = {"code": 200, "msg": "ok"} return JsonResponse(result) # @login_required @login_required def custome(request): method = request.method.upper() agentid = request.GET.get("nid", 1) if method == "GET": return render(request, "custome.html", {"agentid": agentid, "currentUser": request.user}) else: UserId = request.user.username print(request.POST) Content = request.POST.get('mine[content]') FromUserName = request.POST.get('to[userid]') cur_time = datetime.datetime.now() models.Message.objects.create(ToUserName=UserId, AgentID=agentid, FromUserName=FromUserName, content=Content, CreateTime=cur_time, author=request.user, editor=request.user) currentKF = models.KF.objects.filter(agent__id=agentid, status=True).count() from channels.layers import get_channel_layer channel_layer = get_channel_layer() from asgiref.sync import async_to_sync if currentKF > 0: obj = { "username": request.user.first_name , "avatar": avatar_image_url + UserId + ".png" , "id": FromUserName , "type": "chat_message" , "content": Content } # 首次会话查询知识库直接返回信息 未完成 currentOnlineKF = models.KF.objects.filter(agent__id=agentid, status=True).exclude(userid=UserId) print(currentOnlineKF) for item in currentOnlineKF: async_to_sync(channel_layer.group_send)(serviceUser + agentid + item.userid, obj) else: sid = str(FromUserName).split('_')[1] print("staffService===========", sid) staffService = models.staffService.objects.filter(id=sid).first() obj = { "username": staffService.title , "avatar": media_image_url + str(staffService.avatar) , "id": FromUserName , "type": "chat_message" , "content": staffService.notuserText } async_to_sync(channel_layer.group_send)(customeUser + FromUserName[str(FromUserName).rindex("_") + 1:], obj) result = {"code": 200, "msg": "ok"} return JsonResponse(result) @login_required def GetUserList(request): loginUser = request.user.username print("loginUser=================", loginUser) user = models.KF.objects.filter(userid=loginUser).first() mine = {"username": user.username, "id": user.userid, "status": "online", "sign": "客服001", "avatar": media_image_url + str(user.avatar)} ulist = models.userList.objects.filter(agent__agentid=user.agent.agentid).values("id", "username", "userid", "avatar", "sign").order_by( "-createTime")[0:200] for item in ulist: item["avatar"] = media_image_url + item["avatar"] print(item["avatar"]) friend = [{"groupname": "今天", "id": 1, "online": len(ulist), "list": list(ulist)}, {"groupname": "前天", "id": 2, "online": 0, "list": []}, {"groupname": "三天前", "id": 4, "online": 0, "list": []}, {"groupname": "已回复", "id": 5, "online": 0, "list": []}, {"groupname": "未回复", "id": 6, "online": 0, "list": []}] return JsonResponse({'code': 0, 'msg': "", "data": {"mine": mine, "friend": friend, "group": []}}) @login_required def departmentServices(request): loginUser = request.user.username mine = {"username": request.user.first_name, "id": request.user.id, "status": "online", "sign": "127.0.0.1", "avatar": avatar_image_url + loginUser + ".png"} # 好友组 friend = [] agent = models.agent.objects.order_by("agentid") for item in agent: # 好友列表 ulist = [] staff = models.staffService.objects.filter(agent__agentid=item.agentid) for s in staff: ulist.append( {"id": serviceUser + str(s.id) + "_" + loginUser, "username": s.title, "userid": serviceUser + str(s.id) + "_" + loginUser, "avatar": media_image_url + str(s.avatar), "sign": s.desc}) friend.append({"groupname": item.name, "id": item.id, "online": len(ulist), "list": list(ulist)}) print("friend=============", friend) return JsonResponse({'code': 0, 'msg': "", "data": {"mine": mine, "friend": friend, "group": []}}) @login_required def uploadImage(request): file = request.FILES.get("file") # 2.创建一个文件(用于保存图片) fileName = str(uuid.uuid4()) + ".png" save_path = config.MEDIA_ROOT + "/" + fileName # pic.name 上传文件的源文件名 with open(save_path, 'wb') as f: # 3.获取上传文件的内容并写到创建的文件中 for content in file.chunks(): # pic.chunks() 上传文件的内容。 f.write(content) return JsonResponse( {'code': 0, 'msg': "", "data": {"src": "{}/itkfstatic/uploadImage/{}".format(domain, fileName)}}) @login_required def getWelcomeText(request): data = request.POST.get("data", None) name = request.POST["data[name]"] type = request.POST["data[type]"] avatar = request.POST["data[avatar]"] id = request.POST["data[id]"] # 当前服务id nid = id[str(id).find("_") + 1] import datetime # 获取当前时间 now = datetime.datetime.now() # 获取今天零点 zeroToday = now - datetime.timedelta(hours=now.hour, minutes=now.minute, seconds=now.second, microseconds=now.microsecond) # 获取23:59:59 lastToday = zeroToday + datetime.timedelta(hours=23, minutes=59, seconds=59) seconds = int((lastToday - now).total_seconds()) redisKey = request.user.username + "$" + nid if not cache.get(redisKey): cache.set(redisKey, nid, timeout=seconds) staff = models.staffService.objects.filter(id=id[str(id).find("_") + 1]).first() from channels.layers import get_channel_layer channel_layer = get_channel_layer() from asgiref.sync import async_to_sync obj = { "username": name , "avatar": avatar , "id": id , "type": "chat_message" , "content": staff.welcomeText } print("rindex===========", id[str(id).rindex("_") + 1:]) async_to_sync(channel_layer.group_send)(customeUser + id[str(id).rindex("_") + 1:], obj) return JsonResponse({'code': 0, 'msg': ""}) @login_required def updateUserStatus(request): print(request.POST["state"]) flag = False if request.POST["state"] == "online": flag = True models.KF.objects.filter(userid=request.user.username).update(status=flag) return JsonResponse({'code': 0, 'msg': ""}) # 微信企业号 接收消息服务器配置 from django.views.decorators.csrf import csrf_exempt def downloadFile(data, fileType): ''' result = client.media.get_url(data["MediaId"]) file = str(uuid.uuid1()) + "." + fileType BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sourceUrl = os.path.join(BASE_DIR, sourceFile) LocalPath = os.path.join(sourceUrl, file) # os.path.join将多个路径组合后返回 urllib.request.urlretrieve(result, LocalPath) return "/{0}/{1}".format(sourceFile, file) ''' return "OK" class switch_wechat_messages(object): def case_to_function(self, case): fun_name = str(case) + "Message" method = getattr(self, fun_name, self.unknownMessage) return method def textMessage(self, data): Content = data["Content"] def imageMessage(self, data): PicUrl = data["PicUrl"] print(data) def shortVideoMessage(self, data): print(data) def videoMessage(self, data): fileType = "avi" print(data) def voiceMessage(self, data): fileType = data["Format"] print(data) def locationMessage(self, data): print(data) def linkMessage(self, data): print(data) def eventMessage(self, data): AgentID = data["AgentID"] FromUserName = data["FromUserName"] createUser(FromUserName, AgentID) def unknownMessage(self, data): print(data) def createUser(userid, AgentID): return "OK"
13.nginx.conf 配置
upstream itkf { server 106.54.5.14:8004; } location /ws/chat/ { proxy_pass http://itkf; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_redirect off; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Host $server_name; } location /itkf { proxy_pass http://itkf; proxy_set_header Host $host:$server_port; }
14.supervisor_test_itkf_http.conf 配置
[program:itkf] environment=PUBTYPE="app",ENV="test"; command=python3 manage.py runserver 172.17.0.17:8004 --settings=itkf.test_settings ;被监控的进程路径 directory=/itkf/ ; 执行前要不要先cd到目录去,一般不用 priority=7 ;数字越高,优先级越高 numprocs=1 ; 启动几个进程 autostart=true ; 随着supervisord的启动而启动 autorestart=true ; 自动重启。。当然要选上了 startretries=10 ; 启动失败时的最多重试次数 exitcodes=0 ; 正常退出代码(是说退出代码是这个时就不再重启了吗?待确定) stopsignal=KILL ; 用来杀死进程的信号 stopwaitsecs=10 ; 发送SIGKILL前的等待时间 redirect_stderr=true ; 重定向stderr到stdout
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:django + channels + layim 实现用户一对一,一对多,群组聊天实时通讯 - Python技术站