Files
opc-backend/opc_cert/views.py

92 lines
3.9 KiB
Python
Raw Permalink Normal View History

from rest_framework import viewsets, permissions, status
from rest_framework.response import Response
from rest_framework.decorators import action
from .models import OpcCertification, CertStatus
from .serializers import OpcCertificationSerializer
from django.utils import timezone
class OpcCertificationViewSet(viewsets.ModelViewSet):
"""
@author: xujl
Api说明: OPCOne Person Company专家认证申请接口视图提供普通用户提交资质申请管理员审核(approve)驳回(reject)等核心流程认证通过后自动授予OPC_USER角色
"""
queryset = OpcCertification.objects.all()
serializer_class = OpcCertificationSerializer
permission_classes = [permissions.IsAuthenticated]
required_permissions = {
'GET': 'api:certs:read',
'POST': 'api:certs:write',
'PUT': 'api:certs:write',
'PATCH': 'api:certs:write',
'DELETE': 'api:certs:delete'
}
def get_permissions(self):
if self.action in ['create', 'destroy']:
return [permissions.IsAuthenticated()]
if self.action in ['list', 'retrieve']:
# Either it's their own, or they have admin permission
return [permissions.IsAuthenticated()]
if self.action in ['approve', 'reject']:
from users.permissions import HasAPIPermission
return [HasAPIPermission()]
return super().get_permissions()
def get_queryset(self):
user = self.request.user
from users.permissions import HasAPIPermission
# If user has the certs read permission or is superuser, show all
has_admin_perm = False
if user.is_superuser:
has_admin_perm = True
else:
from users.models import RolePermission
perms = set(RolePermission.objects.filter(role__userrole__user=user).values_list('permission__code', flat=True))
if 'api:certs:read' in perms or '*' in perms:
has_admin_perm = True
if has_admin_perm:
return OpcCertification.objects.all().order_by('-created_at')
return OpcCertification.objects.filter(user=user).order_by('-created_at')
def perform_create(self, serializer):
# 提交新申请前,只删除该用户的 PENDING 或 REJECTED 记录,保留已有的 APPROVED 记录
OpcCertification.objects.filter(user=self.request.user).exclude(status=CertStatus.APPROVED).delete()
serializer.save(user=self.request.user, status=CertStatus.PENDING)
@action(detail=True, methods=['post'])
def approve(self, request, pk=None):
cert = self.get_object()
if cert.status != CertStatus.PENDING:
return Response({'detail': '状态不允许该操作'}, status=status.HTTP_400_BAD_REQUEST)
cert.status = CertStatus.APPROVED
cert.reviewer = request.user
cert.reviewed_at = timezone.now()
cert.save()
# 管理员通过后,删除该用户所有的旧认证记录(包括旧的 APPROVED 记录),确保数据库只有最新的一条
OpcCertification.objects.filter(user=cert.user).exclude(id=cert.id).delete()
# 自动追加 OPC_USER 角色
from users.models import Role, UserRole
role_opc, _ = Role.objects.get_or_create(code='OPC_USER', defaults={'name': '认证专家', 'is_system': True})
UserRole.objects.get_or_create(user=cert.user, role=role_opc, defaults={'granted_by': request.user})
return Response({'status': '认证已通过,用户角色已更新'})
@action(detail=True, methods=['post'])
def reject(self, request, pk=None):
cert = self.get_object()
reason = request.data.get('reject_reason', '不符合要求')
cert.status = CertStatus.REJECTED
cert.reject_reason = reason
cert.reviewer = request.user
cert.reviewed_at = timezone.now()
cert.save()
return Response({'status': '认证已驳回'})