from rest_framework import viewsets, generics, permissions, status from rest_framework.response import Response from rest_framework.decorators import action from rest_framework.views import APIView from rest_framework_simplejwt.views import TokenObtainPairView from rest_framework_simplejwt.serializers import TokenObtainPairSerializer from rest_framework_simplejwt.tokens import RefreshToken from django.contrib.auth import get_user_model from django.core.cache import cache import random from .serializers import ( UserSerializer, RegisterSerializer, EnterpriseSerializer, EnterpriseRegisterSerializer, EnterpriseMemberSerializer, RoleSerializer, PermissionSerializer, EnterpriseLoginSerializer, EnterpriseMemberAddSerializer, AdminUserCreateSerializer, ChangePasswordSerializer, PasswordResetRequestSerializer, PasswordResetConfirmSerializer ) from .models import Enterprise, EnterpriseMember, Role, Permission from .permissions import HasAPIPermission from rest_framework import permissions as drf_permissions class EnterpriseLoginView(APIView): """ @author: xujl Api说明: 企业管理员专属登录通道接口。校验企业身份并返回携带对应权限的JWT Token。 """ permission_classes = [permissions.AllowAny] def post(self, request, *args, **kwargs): serializer = EnterpriseLoginSerializer(data=request.data) if serializer.is_valid(): user = serializer.validated_data['user'] refresh = RefreshToken.for_user(user) return Response({ 'refresh': str(refresh), 'access': str(refresh.access_token), 'user': UserSerializer(user).data }) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) class MyTokenObtainPairSerializer(TokenObtainPairSerializer): def validate(self, attrs): username = attrs.get(self.username_field) password = attrs.get('password') from django.contrib.auth import get_user_model from django.db.models import Q from rest_framework.exceptions import AuthenticationFailed from rest_framework_simplejwt.tokens import RefreshToken from django.contrib.auth.models import update_last_login User = get_user_model() user = User.objects.filter(Q(username=username) | Q(phone=username) | Q(email=username)).first() if not user: raise AuthenticationFailed('账号未注册', code='not_registered') if not user.is_active or user.is_deleted: raise AuthenticationFailed('您的账号已被禁用,可能因为违规操作或安全风险。如有疑问请联系平台管理员。', code='account_disabled') if not user.check_password(password): raise AuthenticationFailed('密码错误,请检查后再试', code='wrong_password') refresh = RefreshToken.for_user(user) update_last_login(None, user) return { 'refresh': str(refresh), 'access': str(refresh.access_token), 'user': UserSerializer(user).data } class MyTokenObtainPairView(TokenObtainPairView): """ @author: xujl Api说明: 标准用户认证授权(JWT)接口视图。验证普通用户账号密码并下发访问凭证。 """ serializer_class = MyTokenObtainPairSerializer User = get_user_model() class UserViewSet(viewsets.ModelViewSet): """ @author: xujl Api说明: 平台用户中心接口视图。用于用户的增删改查、信息完善、封禁及状态管理,以及管理员的全局管控。 """ serializer_class = UserSerializer permission_classes = [drf_permissions.IsAuthenticated] # Custom attributes for HasAPIPermission check required_permissions = { 'GET': 'api:users:read', 'POST': 'api:users:write', 'PUT': 'api:users:write', 'PATCH': 'api:users:write', 'DELETE': 'api:users:delete' } def get_queryset(self): from django.db.models import Q from tasks.models import Task, TaskApplication, TaskStatus, ApplyStatus queryset = User.objects.filter(is_deleted=False) # Non-admins can only see OPC users if not (self.request.user.is_staff or self.request.user.is_superuser): queryset = queryset.filter(user_roles__role__code='OPC_USER') role = self.request.query_params.get('role') username = self.request.query_params.get('username') nickname = self.request.query_params.get('nickname') real_name = self.request.query_params.get('real_name') phone = self.request.query_params.get('phone') email = self.request.query_params.get('email') status_filter = self.request.query_params.get('status') idle = self.request.query_params.get('idle') if role: queryset = queryset.filter(user_roles__role__code=role) if username: queryset = queryset.filter(username__icontains=username) if nickname: queryset = queryset.filter(nickname__icontains=nickname) if real_name: queryset = queryset.filter(opc_certifications__real_name__icontains=real_name) if phone: queryset = queryset.filter(phone__icontains=phone) if email: queryset = queryset.filter(email__icontains=email) if status_filter == 'active': queryset = queryset.filter(is_active=True) elif status_filter == 'disabled': queryset = queryset.filter(is_active=False) if idle == 'true': active_app_users = TaskApplication.objects.filter( status__in=[ApplyStatus.PENDING, ApplyStatus.APPROVED, ApplyStatus.DELIVERED] ).values_list('applicant_id', flat=True) active_task_publishers = Task.objects.filter( status__in=[TaskStatus.OPEN, TaskStatus.IN_PROGRESS] ).values_list('publisher_id', flat=True) queryset = queryset.exclude(id__in=active_app_users).exclude(id__in=active_task_publishers) return queryset.order_by('-is_recommended', '-recommend_priority', '-created_at').distinct() def get_permissions(self): if self.action in ['me', 'update_profile', 'avatar', 'change_password', 'list', 'retrieve']: return [drf_permissions.IsAuthenticated()] if self.action in ['admin_create', 'toggle_status', 'reset_password', 'create', 'update', 'partial_update', 'destroy', 'update_opc_stats']: return [HasAPIPermission()] if self.action in ['request_password_reset', 'confirm_password_reset']: return [drf_permissions.AllowAny()] return super().get_permissions() @action(detail=False, methods=['post'], permission_classes=[permissions.IsAdminUser]) def admin_create(self, request): serializer = AdminUserCreateSerializer(data=request.data) if serializer.is_valid(): user = serializer.save() user.set_password(serializer.validated_data['password']) user.save() return Response(UserSerializer(user).data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) def update(self, request, *args, **kwargs): partial = kwargs.pop('partial', False) instance = self.get_object() serializer = self.get_serializer(instance, data=request.data, partial=partial) serializer.is_valid(raise_exception=True) self.perform_update(serializer) # Handle explicitly read-only fields for Admin if request.user.is_staff or request.user.is_superuser: needs_save = False for field in ['is_active', 'status', 'avatar_url', 'bio', 'location', 'face_url']: if field in request.data: setattr(instance, field, request.data[field]) needs_save = True if needs_save: instance.save() # Handle Role Assignments inside the big form if 'roles' in request.data: role_codes = request.data['roles'] from .models import Role, UserRole UserRole.objects.filter(user=instance).delete() for code in role_codes: role = Role.objects.filter(code=code).first() if role: UserRole.objects.create(user=instance, role=role, granted_by=request.user) # Handle enterprise_info updates enterprise_info = request.data.get('enterprise_info') if enterprise_info and 'ENTERPRISE' in instance.roles: from .models import Enterprise ent, created = Enterprise.objects.get_or_create(user=instance, defaults={'status': 'VERIFIED'}) ent.company_name = enterprise_info.get('company_name', ent.company_name) ent.credit_code = enterprise_info.get('credit_code', ent.credit_code) ent.business_license = enterprise_info.get('business_license', ent.business_license) ent.logo_url = enterprise_info.get('logo_url', ent.logo_url) ent.contact_name = enterprise_info.get('contact_name', ent.contact_name) ent.contact_phone = enterprise_info.get('contact_phone', ent.contact_phone) ent.contact_email = enterprise_info.get('contact_email', ent.contact_email) ent.address = enterprise_info.get('address', ent.address) ent.description = enterprise_info.get('description', ent.description) ent.save() # Handle opc_certification updates opc_certification = request.data.get('opc_certification') if opc_certification and 'OPC_USER' in instance.roles: from opc_cert.models import OpcCertification cert, created = OpcCertification.objects.get_or_create(user=instance, defaults={'status': 'APPROVED'}) cert.real_name = opc_certification.get('real_name', cert.real_name) cert.id_card = opc_certification.get('id_card', cert.id_card) cert.experience = opc_certification.get('experience', cert.experience) cert.resume_url = opc_certification.get('resume_url', cert.resume_url) cert.attachments = opc_certification.get('attachments', cert.attachments) cert.skills = opc_certification.get('skills', cert.skills) cert.status = opc_certification.get('status', cert.status) if 'rating' in opc_certification: cert.rating = opc_certification.get('rating') cert.save() if 'rating' in opc_certification: instance.rating = opc_certification.get('rating') instance.save() if getattr(instance, '_prefetched_objects_cache', None): instance._prefetched_objects_cache = {} return Response(self.get_serializer(instance).data) @action(detail=False, methods=['post']) def change_password(self, request): serializer = ChangePasswordSerializer(data=request.data) if serializer.is_valid(): user = request.user if not user.check_password(serializer.validated_data['old_password']): return Response({'old_password': ['原密码错误']}, status=status.HTTP_400_BAD_REQUEST) user.set_password(serializer.validated_data['new_password']) user.save() return Response({'status': '密码修改成功'}) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @action(detail=False, methods=['post'], permission_classes=[drf_permissions.AllowAny]) def request_password_reset(self, request): serializer = PasswordResetRequestSerializer(data=request.data) if serializer.is_valid(): email = serializer.validated_data['email'] user = User.objects.filter(email=email).first() if user: # Mock sending code: Generate 6-digit code and store in cache code = ''.join(random.choices('0123456789', k=6)) cache.set(f'pwd_reset_{email}', code, timeout=300) # 5 minutes # In production, send via Email/SMS. Here we return it or print it. print(f"--- MOCK EMAIL --- Password reset code for {email}: {code}") return Response({'status': '验证码已发送 (开发环境请查看控制台输出)'}) else: # To prevent email enumeration, still return success return Response({'status': '验证码已发送 (开发环境请查看控制台输出)'}) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @action(detail=False, methods=['post'], permission_classes=[drf_permissions.AllowAny]) def confirm_password_reset(self, request): serializer = PasswordResetConfirmSerializer(data=request.data) if serializer.is_valid(): email = serializer.validated_data['email'] code = serializer.validated_data['verification_code'] cached_code = cache.get(f'pwd_reset_{email}') if not cached_code or cached_code != code: return Response({'verification_code': ['验证码无效或已过期']}, status=status.HTTP_400_BAD_REQUEST) user = User.objects.filter(email=email).first() if user: user.set_password(serializer.validated_data['new_password']) user.save() cache.delete(f'pwd_reset_{email}') return Response({'status': '密码重置成功'}) return Response({'detail': '用户不存在'}, status=status.HTTP_404_NOT_FOUND) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @action(detail=True, methods=['post']) def reset_password(self, request, pk=None): user = self.get_object() new_password = request.data.get('password', '123456') user.set_password(new_password) user.save() return Response({'status': '密码已重置'}) @action(detail=False, methods=['get']) def me(self, request): serializer = self.get_serializer(request.user) return Response(serializer.data) def perform_destroy(self, instance): from tasks.models import Task, TaskApplication, TaskStatus, ApplyStatus from django.utils import timezone force = self.request.query_params.get('force', '').lower() == 'true' # Check active expert applications active_apps = TaskApplication.objects.filter( applicant=instance, status__in=[ApplyStatus.PENDING, ApplyStatus.APPROVED, ApplyStatus.DELIVERED] ) # Check active enterprise tasks active_tasks = Task.objects.filter( publisher=instance, status__in=[TaskStatus.OPEN, TaskStatus.IN_PROGRESS] ) has_blockers = active_apps.exists() or active_tasks.exists() if has_blockers and not force: from rest_framework.exceptions import ValidationError blocker_details = [] if active_apps.exists(): blocker_details.append(f'专家接单 {active_apps.count()} 项') if active_tasks.exists(): blocker_details.append(f'发布任务 {active_tasks.count()} 项') raise ValidationError( f'用户 {instance.username} 有正在进行中的任务({"、".join(blocker_details)}),无法直接删除。请先取消相关任务或强制删除。' ) if has_blockers and force: # Lock tasks instead of cancelling — mark as anomalous # For tasks where this user is the ONLY active participant, set CANCELLED # Otherwise keep IN_PROGRESS but lock the user's application for app in active_apps: app.status = ApplyStatus.REJECTED app.reject_reason = f'用户 {instance.nickname or instance.username} 账户已被管理员删除/锁定' app.save() # Check if task still has other active participants other_apps = app.task.applications.filter( status__in=[ApplyStatus.APPROVED, ApplyStatus.DELIVERED] ).exclude(applicant=instance) if not other_apps.exists() and app.task.status == TaskStatus.IN_PROGRESS: app.task.status = TaskStatus.CANCELLED app.task.cancel_reason = f'唯一执行专家 {instance.nickname or instance.username} 已被系统锁定,任务异常中止' app.task.cancelled_at = timezone.now() app.task.save() active_tasks.update( status=TaskStatus.CANCELLED, cancelled_at=timezone.now(), cancel_reason=f'管理员强制删除用户 {instance.username},系统自动取消关联任务' ) # Check and cascade close enterprise enterprise = Enterprise.objects.filter(user=instance, is_deleted=False).first() if enterprise: # Close enterprise: soft delete + remove all members enterprise.is_deleted = True enterprise.status = 'REJECTED' enterprise.save() EnterpriseMember.objects.filter(enterprise=enterprise).delete() # Cancel enterprise's published tasks Task.objects.filter( enterprise=enterprise, status__in=[TaskStatus.OPEN, TaskStatus.IN_PROGRESS] ).update( status=TaskStatus.CANCELLED, cancelled_at=timezone.now(), cancel_reason=f'企业 {enterprise.company_name} 已被清退,关联任务自动取消' ) instance.is_deleted = True instance.is_active = False # Free up unique fields for re-registration while preserving for audit deleted_suffix = f'__deleted_{instance.id}' if instance.phone: instance.phone = instance.phone + deleted_suffix if instance.email: instance.email = instance.email + deleted_suffix instance.username = instance.username + deleted_suffix instance.save() @action(detail=False, methods=['post'], permission_classes=[permissions.IsAdminUser]) def batch_delete(self, request): user_ids = request.data.get('user_ids', []) force = request.data.get('force', False) if not user_ids: return Response({'detail': '缺少 user_ids'}, status=status.HTTP_400_BAD_REQUEST) users = User.objects.filter(id__in=user_ids) from tasks.models import Task, TaskApplication, TaskStatus, ApplyStatus from django.utils import timezone blocked_users = [] for user in users: active_apps = TaskApplication.objects.filter( applicant=user, status__in=[ApplyStatus.PENDING, ApplyStatus.APPROVED, ApplyStatus.DELIVERED] ) active_tasks = Task.objects.filter( publisher=user, status__in=[TaskStatus.OPEN, TaskStatus.IN_PROGRESS] ) if active_apps.exists() or active_tasks.exists(): blocked_users.append({ 'id': str(user.id), 'username': user.username, 'nickname': user.nickname or user.username, 'app_count': active_apps.count(), 'task_count': active_tasks.count(), }) if blocked_users and not force: return Response({ 'detail': f'共 {len(blocked_users)} 个用户有正在进行中的任务,无法直接批量删除。', 'blocked_users': blocked_users, 'has_blockers': True, }, status=status.HTTP_400_BAD_REQUEST) if blocked_users and force: for user in users: TaskApplication.objects.filter( applicant=user, status__in=[ApplyStatus.PENDING, ApplyStatus.APPROVED, ApplyStatus.DELIVERED] ).update(status=ApplyStatus.WITHDRAWN) Task.objects.filter( publisher=user, status__in=[TaskStatus.OPEN, TaskStatus.IN_PROGRESS] ).update( status=TaskStatus.CANCELLED, cancelled_at=timezone.now(), cancel_reason=f'管理员批量强制删除,系统自动取消关联任务' ) for user in users: user.is_deleted = True user.is_active = False deleted_suffix = f'__deleted_{user.id}' if user.phone: user.phone = user.phone + deleted_suffix if user.email: user.email = user.email + deleted_suffix user.username = user.username + deleted_suffix user.save() return Response({'status': '批量删除成功'}) @action(detail=True, methods=['post'], permission_classes=[permissions.IsAdminUser]) def assign_roles(self, request, pk=None): user = self.get_object() role_codes = request.data.get('roles', []) from .models import Role, UserRole # 清除旧角色 UserRole.objects.filter(user=user).delete() # 重新分配 for code in role_codes: role = Role.objects.filter(code=code).first() if role: UserRole.objects.create(user=user, role=role, granted_by=request.user) return Response({'status': '角色分配成功'}) @action(detail=True, methods=['post'], permission_classes=[permissions.IsAdminUser]) def update_rating(self, request, pk=None): user = self.get_object() rating = request.data.get('rating') if rating is not None: user.rating = rating user.save() return Response({'status': '评分已更新'}) return Response({'detail': '缺少评分参数'}, status=status.HTTP_400_BAD_REQUEST) @action(detail=True, methods=['get'], permission_classes=[permissions.IsAdminUser]) def active_tasks(self, request, pk=None): user = self.get_object() from tasks.models import Task, TaskApplication, TaskStatus, ApplyStatus results = [] # Expert tasks expert_apps = TaskApplication.objects.filter( applicant=user, status__in=[ApplyStatus.PENDING, ApplyStatus.APPROVED, ApplyStatus.DELIVERED] ) for app in expert_apps: results.append({ 'id': app.task.id, 'title': app.task.title, 'role': '专家接单', 'status': app.status, 'task_status': app.task.status, 'type': 'application' }) # Publisher tasks pub_tasks = Task.objects.filter( publisher=user, status__in=[TaskStatus.OPEN, TaskStatus.IN_PROGRESS] ) for t in pub_tasks: results.append({ 'id': t.id, 'title': t.title, 'role': '任务发布方', 'status': t.status, 'task_status': t.status, 'type': 'task' }) return Response(results) @action(detail=False, methods=['put']) def update_profile(self, request): user = request.user serializer = UserSerializer(user, data=request.data, partial=True) if serializer.is_valid(): serializer.save() return Response(serializer.data) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @action(detail=True, methods=['post'], permission_classes=[permissions.IsAdminUser]) def toggle_status(self, request, pk=None): user = self.get_object() user.is_active = not user.is_active user.save() return Response({'status': '状态已更新', 'is_active': user.is_active}) @action(detail=True, methods=['put'], permission_classes=[permissions.IsAdminUser]) def update_opc_stats(self, request, pk=None): user = self.get_object() rating = request.data.get('rating') completed_tasks = request.data.get('completed_tasks') if rating is not None: user.rating = rating if completed_tasks is not None: user.completed_tasks = completed_tasks user.save() return Response(UserSerializer(user).data) @action(detail=True, methods=['post'], permission_classes=[permissions.IsAdminUser]) def toggle_recommend(self, request, pk=None): user = self.get_object() user.is_recommended = request.data.get('is_recommended', not user.is_recommended) user.recommend_priority = request.data.get('recommend_priority', user.recommend_priority) user.save(update_fields=['is_recommended', 'recommend_priority']) return Response({'is_recommended': user.is_recommended, 'recommend_priority': user.recommend_priority}) @action(detail=False, methods=['get'], permission_classes=[permissions.IsAdminUser]) def deleted_users(self, request): """List soft-deleted users for admin review/recovery.""" deleted = User.objects.filter(is_deleted=True).order_by('-updated_at') page = self.paginate_queryset(deleted) if page is not None: serializer = UserSerializer(page, many=True) return self.get_paginated_response(serializer.data) serializer = UserSerializer(deleted, many=True) return Response(serializer.data) @action(detail=True, methods=['post'], permission_classes=[permissions.IsAdminUser]) def restore(self, request, pk=None): """Restore a soft-deleted user by clearing the deletion suffix.""" try: user = User.objects.get(id=pk, is_deleted=True) except User.DoesNotExist: return Response({'detail': '未找到已删除的用户'}, status=status.HTTP_404_NOT_FOUND) deleted_suffix = f'__deleted_{user.id}' # Restore unique fields if user.username.endswith(deleted_suffix): original_username = user.username[:-len(deleted_suffix)] if User.objects.filter(username=original_username, is_deleted=False).exists(): return Response({'detail': f'用户名 {original_username} 已被新用户占用,无法恢复'}, status=status.HTTP_400_BAD_REQUEST) user.username = original_username if user.phone and user.phone.endswith(deleted_suffix): original_phone = user.phone[:-len(deleted_suffix)] if User.objects.filter(phone=original_phone, is_deleted=False).exists(): return Response({'detail': f'手机号 {original_phone} 已被新用户占用,无法恢复'}, status=status.HTTP_400_BAD_REQUEST) user.phone = original_phone if user.email and user.email.endswith(deleted_suffix): original_email = user.email[:-len(deleted_suffix)] if User.objects.filter(email=original_email, is_deleted=False).exists(): return Response({'detail': f'邮箱 {original_email} 已被新用户占用,无法恢复'}, status=status.HTTP_400_BAD_REQUEST) user.email = original_email user.is_deleted = False user.is_active = True user.save() return Response({'status': f'用户 {user.username} 已成功恢复'}) @action(detail=False, methods=['post']) def avatar(self, request): # In a real app, handle file upload to MinIO # For now, we expect a URL or just mock it user = request.user avatar_url = request.data.get('avatar_url') if avatar_url: user.avatar_url = avatar_url user.save() return Response({'status': '头像已更新', 'avatar_url': avatar_url}) return Response({'detail': '缺少 avatar_url'}, status=status.HTTP_400_BAD_REQUEST) class RegisterView(generics.CreateAPIView): """ @author: xujl Api说明: 平台普通用户注册接口视图。提供基础的账号密码/手机/邮箱注册流程。 """ queryset = User.objects.all() permission_classes = [permissions.AllowAny] serializer_class = RegisterSerializer class EnterpriseViewSet(viewsets.ModelViewSet): """ @author: xujl Api说明: 企业资质及账号管理接口视图。企业用户提交工商资料认证、基本信息更新,管理员对企业入驻进行审核。 """ queryset = Enterprise.objects.filter(is_deleted=False).order_by('-created_at') permission_classes = [permissions.IsAuthenticated] def get_serializer_class(self): if self.action == 'create' and (self.request.user.is_staff or self.request.user.is_superuser): from .serializers import AdminEnterpriseCreateSerializer return AdminEnterpriseCreateSerializer from .serializers import EnterpriseSerializer return EnterpriseSerializer required_permissions = { 'GET': 'api:enterprises:read', 'POST': 'api:enterprises:write', 'PUT': 'api:enterprises:write', 'PATCH': 'api:enterprises:write', 'DELETE': 'api:enterprises:delete' } def get_permissions(self): if self.action in ['me', 'create', 'update', 'partial_update']: return [drf_permissions.IsAuthenticated()] if self.action in ['list', 'retrieve', 'verify_status']: return [HasAPIPermission()] return super().get_permissions() @action(detail=False, methods=['get']) def me(self, request): try: # First check if user is the enterprise owner enterprise = Enterprise.objects.filter(user=request.user).first() role = 'ADMIN' if not enterprise: # If not owner, check if user is a member member = EnterpriseMember.objects.filter(user=request.user).first() if member: enterprise = member.enterprise role = member.role else: return Response({'detail': '未找到企业信息'}, status=status.HTTP_404_NOT_FOUND) serializer = self.get_serializer(enterprise) data = serializer.data data['my_role'] = role return Response(data) except Exception as e: return Response({'detail': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) def perform_create(self, serializer): if self.request.user.is_staff or self.request.user.is_superuser: serializer.save() else: enterprise = serializer.save(user=self.request.user) # Auto assign ENTERPRISE role from .models import Role, UserRole role, created = Role.objects.get_or_create(code='ENTERPRISE', defaults={'name': '企业用户'}) UserRole.objects.get_or_create(user=self.request.user, role=role) def _check_admin_permission(self, request, enterprise): if request.user.is_superuser or request.user.is_staff: return True if request.user == enterprise.user: return True member = EnterpriseMember.objects.filter(enterprise=enterprise, user=request.user).first() return member and member.role == 'ADMIN' def update(self, request, *args, **kwargs): enterprise = self.get_object() if not self._check_admin_permission(request, enterprise): return Response({'detail': '只有企业管理员可以编辑企业数据'}, status=status.HTTP_403_FORBIDDEN) return super().update(request, *args, **kwargs) def partial_update(self, request, *args, **kwargs): enterprise = self.get_object() if not self._check_admin_permission(request, enterprise): return Response({'detail': '只有企业管理员可以编辑企业数据'}, status=status.HTTP_403_FORBIDDEN) return super().partial_update(request, *args, **kwargs) def perform_destroy(self, instance): """Cascade enterprise deletion: soft-delete, remove members, cancel active tasks, free email.""" from tasks.models import Task, TaskStatus from django.utils import timezone # Cancel all active tasks for this enterprise Task.objects.filter( enterprise=instance, status__in=[TaskStatus.OPEN, TaskStatus.IN_PROGRESS] ).update( status=TaskStatus.CANCELLED, cancelled_at=timezone.now(), cancel_reason=f'企业 {instance.company_name} 已被清退,项目中止' ) # Remove all team members EnterpriseMember.objects.filter(enterprise=instance).delete() # Free up the enterprise user's email for re-use (suffix pattern) owner = instance.user suffix = f'__deleted_{str(instance.id)[:8]}' if owner.email and '__deleted_' not in owner.email: owner.email = owner.email + suffix owner.save(update_fields=['email']) # Free up credit_code if instance.credit_code and '__deleted_' not in instance.credit_code: instance.credit_code = instance.credit_code + suffix # Soft delete enterprise instance.is_deleted = True instance.status = 'REJECTED' instance.save() @action(detail=False, methods=['get']) def deleted_enterprises(self, request): """List soft-deleted enterprises for the recycle bin.""" deleted = Enterprise.objects.filter(is_deleted=True).order_by('-updated_at') serializer = self.get_serializer(deleted, many=True) return Response(serializer.data) @action(detail=True, methods=['post']) def restore(self, request, pk=None): """Restore a soft-deleted enterprise.""" enterprise = Enterprise.objects.filter(pk=pk, is_deleted=True).first() if not enterprise: return Response({'detail': '未找到已删除的企业'}, status=status.HTTP_404_NOT_FOUND) # Restore credit_code if enterprise.credit_code and '__deleted_' in enterprise.credit_code: original_code = enterprise.credit_code.split('__deleted_')[0] if Enterprise.objects.filter(credit_code=original_code, is_deleted=False).exists(): return Response({'detail': f'统一社会信用代码 {original_code} 已被其他企业占用'}, status=status.HTTP_409_CONFLICT) enterprise.credit_code = original_code # Restore owner email owner = enterprise.user if owner.email and '__deleted_' in owner.email: original_email = owner.email.split('__deleted_')[0] if User.objects.filter(email=original_email, is_deleted=False).exclude(pk=owner.pk).exists(): return Response({'detail': f'邮箱 {original_email} 已被其他用户占用,无法恢复'}, status=status.HTTP_409_CONFLICT) owner.email = original_email owner.save(update_fields=['email']) enterprise.is_deleted = False enterprise.status = 'VERIFIED' enterprise.save() return Response({'detail': '企业已恢复'}) @action(detail=True, methods=['post']) def verify_status(self, request, pk=None): enterprise = self.get_object() status_val = request.data.get('status') if status_val not in [choice[0] for choice in Enterprise.EnterpriseStatus.choices]: return Response({'detail': '无效的状态'}, status=status.HTTP_400_BAD_REQUEST) enterprise.status = status_val enterprise.save() return Response({'status': '企业审核状态已更新', 'enterprise_status': enterprise.status}) class EnterpriseRegisterView(generics.CreateAPIView): """ @author: xujl Api说明: 企业级账号入驻/注册接口视图。提交企业信用代码和法人代表等认证资料。 """ serializer_class = EnterpriseRegisterSerializer permission_classes = [permissions.AllowAny] def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) if serializer.is_valid(): serializer.save() return Response({'status': '企业注册成功'}, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) class EnterpriseMemberViewSet(viewsets.ModelViewSet): """ @author: xujl Api说明: 企业内部成员管理接口视图。用于给企业内部关联多个员工账号并分配角色(管理员、普通成员)。 """ serializer_class = EnterpriseMemberSerializer permission_classes = [permissions.IsAuthenticated] def get_queryset(self): if self.request.user.is_staff or self.request.user.is_superuser: enterprise_id = self.request.query_params.get('enterprise') if enterprise_id: return EnterpriseMember.objects.filter(enterprise_id=enterprise_id).order_by('-joined_at') return EnterpriseMember.objects.all().order_by('-joined_at') try: enterprise = Enterprise.objects.get(user=self.request.user) return EnterpriseMember.objects.filter(enterprise=enterprise).order_by('-joined_at') except Enterprise.DoesNotExist: return EnterpriseMember.objects.none() def perform_create(self, serializer): enterprise = Enterprise.objects.get(user=self.request.user) serializer.save(enterprise=enterprise) @action(detail=False, methods=['post'], permission_classes=[permissions.IsAuthenticated]) def add_member(self, request): serializer = EnterpriseMemberAddSerializer(data=request.data, context={'request': request}) if serializer.is_valid(): member = serializer.save() return Response(EnterpriseMemberSerializer(member).data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @action(detail=False, methods=['post'], permission_classes=[permissions.IsAdminUser]) def batch_delete(self, request): member_ids = request.data.get('member_ids', []) if not member_ids: return Response({'detail': '缺少 member_ids'}, status=status.HTTP_400_BAD_REQUEST) EnterpriseMember.objects.filter(id__in=member_ids).delete() return Response({'status': '批量删除成功'}) @action(detail=False, methods=['post']) def invite(self, request): email = request.data.get('email') role = request.data.get('role', 'MEMBER') try: enterprise = Enterprise.objects.get(user=request.user) user = User.objects.get(email=email) if EnterpriseMember.objects.filter(enterprise=enterprise, user=user).exists(): return Response({'detail': '该用户已在团队中'}, status=status.HTTP_400_BAD_REQUEST) member = EnterpriseMember.objects.create( enterprise=enterprise, user=user, role=role ) return Response(EnterpriseMemberSerializer(member).data, status=status.HTTP_201_CREATED) except Enterprise.DoesNotExist: return Response({'detail': '未找到企业信息'}, status=status.HTTP_404_NOT_FOUND) except User.DoesNotExist: return Response({'detail': '该用户尚未注册平台,请先引导其注册'}, status=status.HTTP_404_NOT_FOUND) class RoleViewSet(viewsets.ModelViewSet): """ @author: xujl Api说明: RBAC 角色管理接口视图。用于定义平台角色类别,以及为用户进行角色的授予和回收。 """ queryset = Role.objects.all().order_by('sort_order', 'created_at') serializer_class = RoleSerializer pagination_class = None # Return all roles without pagination permission_classes = [HasAPIPermission] required_permissions = { 'GET': 'menu:account:roles', 'POST': 'menu:account:roles', 'PUT': 'menu:account:roles', 'DELETE': 'menu:account:roles' } @action(detail=True, methods=['post']) def assign_permissions(self, request, pk=None): role = self.get_object() permission_ids = request.data.get('permissions', []) from .models import Permission, RolePermission RolePermission.objects.filter(role=role).delete() for pid in permission_ids: perm = Permission.objects.filter(id=pid).first() if perm: RolePermission.objects.create(role=role, permission=perm) return Response({'status': '权限分配成功'}) class PermissionViewSet(viewsets.ModelViewSet): """ @author: xujl Api说明: RBAC 权限节点管理接口视图。用于查询系统的路由、菜单及按钮级别的权限树结构。 """ queryset = Permission.objects.all().order_by('sort_order', 'created_at') serializer_class = PermissionSerializer pagination_class = None # Return all permissions without pagination permission_classes = [HasAPIPermission] required_permissions = { 'GET': 'menu:account:permissions', 'POST': 'menu:account:permissions', 'PUT': 'menu:account:permissions', 'DELETE': 'menu:account:permissions' }