Files
opc-backend/opc_cert/views.py
xujl 23855ef0e4 开发了多角色登录与鉴权接口:实现了普通用户、企业和管理员的登录分流,并支持Token验证。
开发了权限控制接口:实现了通过数据库分配菜单权限节点,控制接口访问安全。
开发了实名认证中心:实现了个人身份证信息与企业营业执照的提交与审核接口。
开发了任务与协作大厅核心业务:实现了任务的发布、接单、状态流转以及专家邀约接口。
配置了全局环境变量与数据库引擎:集成了 PostgreSQL 数据库、Redis 缓存与 MinIO 对象存储。
2026-04-28 16:32:02 +08:00

92 lines
3.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from rest_framework import viewsets, permissions, status
from rest_framework.response import Response
from rest_framework.decorators import action
from .models import OpcCertification, CertStatus
from .serializers import OpcCertificationSerializer
from django.utils import timezone
class OpcCertificationViewSet(viewsets.ModelViewSet):
"""
@author: xujl
Api说明: OPCOne Person Company专家认证申请接口视图。提供普通用户提交资质申请、管理员审核(approve)、驳回(reject)等核心流程。认证通过后自动授予OPC_USER角色。
"""
queryset = OpcCertification.objects.all()
serializer_class = OpcCertificationSerializer
permission_classes = [permissions.IsAuthenticated]
required_permissions = {
'GET': 'api:certs:read',
'POST': 'api:certs:write',
'PUT': 'api:certs:write',
'PATCH': 'api:certs:write',
'DELETE': 'api:certs:delete'
}
def get_permissions(self):
if self.action in ['create', 'destroy']:
return [permissions.IsAuthenticated()]
if self.action in ['list', 'retrieve']:
# Either it's their own, or they have admin permission
return [permissions.IsAuthenticated()]
if self.action in ['approve', 'reject']:
from users.permissions import HasAPIPermission
return [HasAPIPermission()]
return super().get_permissions()
def get_queryset(self):
user = self.request.user
from users.permissions import HasAPIPermission
# If user has the certs read permission or is superuser, show all
has_admin_perm = False
if user.is_superuser:
has_admin_perm = True
else:
from users.models import RolePermission
perms = set(RolePermission.objects.filter(role__userrole__user=user).values_list('permission__code', flat=True))
if 'api:certs:read' in perms or '*' in perms:
has_admin_perm = True
if has_admin_perm:
return OpcCertification.objects.all().order_by('-created_at')
return OpcCertification.objects.filter(user=user).order_by('-created_at')
def perform_create(self, serializer):
# 提交新申请前,只删除该用户的 PENDING 或 REJECTED 记录,保留已有的 APPROVED 记录
OpcCertification.objects.filter(user=self.request.user).exclude(status=CertStatus.APPROVED).delete()
serializer.save(user=self.request.user, status=CertStatus.PENDING)
@action(detail=True, methods=['post'])
def approve(self, request, pk=None):
cert = self.get_object()
if cert.status != CertStatus.PENDING:
return Response({'detail': '状态不允许该操作'}, status=status.HTTP_400_BAD_REQUEST)
cert.status = CertStatus.APPROVED
cert.reviewer = request.user
cert.reviewed_at = timezone.now()
cert.save()
# 管理员通过后,删除该用户所有的旧认证记录(包括旧的 APPROVED 记录),确保数据库只有最新的一条
OpcCertification.objects.filter(user=cert.user).exclude(id=cert.id).delete()
# 自动追加 OPC_USER 角色
from users.models import Role, UserRole
role_opc, _ = Role.objects.get_or_create(code='OPC_USER', defaults={'name': '认证专家', 'is_system': True})
UserRole.objects.get_or_create(user=cert.user, role=role_opc, defaults={'granted_by': request.user})
return Response({'status': '认证已通过,用户角色已更新'})
@action(detail=True, methods=['post'])
def reject(self, request, pk=None):
cert = self.get_object()
reason = request.data.get('reject_reason', '不符合要求')
cert.status = CertStatus.REJECTED
cert.reject_reason = reason
cert.reviewer = request.user
cert.reviewed_at = timezone.now()
cert.save()
return Response({'status': '认证已驳回'})