from rest_framework import viewsets, permissions, status from rest_framework.response import Response from rest_framework.decorators import action from .models import OpcCertification, CertStatus from .serializers import OpcCertificationSerializer from django.utils import timezone class OpcCertificationViewSet(viewsets.ModelViewSet): """ @author: xujl Api说明: OPC(One Person Company)专家认证申请接口视图。提供普通用户提交资质申请、管理员审核(approve)、驳回(reject)等核心流程。认证通过后自动授予OPC_USER角色。 """ queryset = OpcCertification.objects.all() serializer_class = OpcCertificationSerializer permission_classes = [permissions.IsAuthenticated] required_permissions = { 'GET': 'api:certs:read', 'POST': 'api:certs:write', 'PUT': 'api:certs:write', 'PATCH': 'api:certs:write', 'DELETE': 'api:certs:delete' } def get_permissions(self): if self.action in ['create', 'destroy']: return [permissions.IsAuthenticated()] if self.action in ['list', 'retrieve']: # Either it's their own, or they have admin permission return [permissions.IsAuthenticated()] if self.action in ['approve', 'reject']: from users.permissions import HasAPIPermission return [HasAPIPermission()] return super().get_permissions() def get_queryset(self): user = self.request.user from users.permissions import HasAPIPermission # If user has the certs read permission or is superuser, show all has_admin_perm = False if user.is_superuser: has_admin_perm = True else: from users.models import RolePermission perms = set(RolePermission.objects.filter(role__userrole__user=user).values_list('permission__code', flat=True)) if 'api:certs:read' in perms or '*' in perms: has_admin_perm = True if has_admin_perm: return OpcCertification.objects.all().order_by('-created_at') return OpcCertification.objects.filter(user=user).order_by('-created_at') def perform_create(self, serializer): # 提交新申请前,只删除该用户的 PENDING 或 REJECTED 记录,保留已有的 APPROVED 记录 OpcCertification.objects.filter(user=self.request.user).exclude(status=CertStatus.APPROVED).delete() serializer.save(user=self.request.user, status=CertStatus.PENDING) @action(detail=True, methods=['post']) def approve(self, request, pk=None): cert = self.get_object() if cert.status != CertStatus.PENDING: return Response({'detail': '状态不允许该操作'}, status=status.HTTP_400_BAD_REQUEST) cert.status = CertStatus.APPROVED cert.reviewer = request.user cert.reviewed_at = timezone.now() cert.save() # 管理员通过后,删除该用户所有的旧认证记录(包括旧的 APPROVED 记录),确保数据库只有最新的一条 OpcCertification.objects.filter(user=cert.user).exclude(id=cert.id).delete() # 自动追加 OPC_USER 角色 from users.models import Role, UserRole role_opc, _ = Role.objects.get_or_create(code='OPC_USER', defaults={'name': '认证专家', 'is_system': True}) UserRole.objects.get_or_create(user=cert.user, role=role_opc, defaults={'granted_by': request.user}) return Response({'status': '认证已通过,用户角色已更新'}) @action(detail=True, methods=['post']) def reject(self, request, pk=None): cert = self.get_object() reason = request.data.get('reject_reason', '不符合要求') cert.status = CertStatus.REJECTED cert.reject_reason = reason cert.reviewer = request.user cert.reviewed_at = timezone.now() cert.save() return Response({'status': '认证已驳回'})