Files
opc-backend/users/views.py

901 lines
41 KiB
Python
Raw Permalink Normal View History

from rest_framework import viewsets, generics, permissions, status
from rest_framework.response import Response
from rest_framework.decorators import action
from rest_framework.views import APIView
from rest_framework_simplejwt.views import TokenObtainPairView
from rest_framework_simplejwt.serializers import TokenObtainPairSerializer
from rest_framework_simplejwt.tokens import RefreshToken
from django.contrib.auth import get_user_model
from django.core.cache import cache
import random
from .serializers import (
UserSerializer, RegisterSerializer, EnterpriseSerializer, EnterpriseRegisterSerializer,
EnterpriseMemberSerializer, RoleSerializer, PermissionSerializer, EnterpriseLoginSerializer,
EnterpriseMemberAddSerializer, AdminUserCreateSerializer, ChangePasswordSerializer,
PasswordResetRequestSerializer, PasswordResetConfirmSerializer
)
from .models import Enterprise, EnterpriseMember, Role, Permission
from .permissions import HasAPIPermission
from rest_framework import permissions as drf_permissions
class EnterpriseLoginView(APIView):
"""
@author: xujl
Api说明: 企业管理员专属登录通道接口校验企业身份并返回携带对应权限的JWT Token
"""
permission_classes = [permissions.AllowAny]
def post(self, request, *args, **kwargs):
serializer = EnterpriseLoginSerializer(data=request.data)
if serializer.is_valid():
user = serializer.validated_data['user']
refresh = RefreshToken.for_user(user)
return Response({
'refresh': str(refresh),
'access': str(refresh.access_token),
'user': UserSerializer(user).data
})
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class MyTokenObtainPairSerializer(TokenObtainPairSerializer):
def validate(self, attrs):
username = attrs.get(self.username_field)
password = attrs.get('password')
from django.contrib.auth import get_user_model
from django.db.models import Q
from rest_framework.exceptions import AuthenticationFailed
from rest_framework_simplejwt.tokens import RefreshToken
from django.contrib.auth.models import update_last_login
User = get_user_model()
user = User.objects.filter(Q(username=username) | Q(phone=username) | Q(email=username)).first()
if not user:
raise AuthenticationFailed('账号未注册', code='not_registered')
if not user.is_active or user.is_deleted:
raise AuthenticationFailed('您的账号已被禁用,可能因为违规操作或安全风险。如有疑问请联系平台管理员。', code='account_disabled')
if not user.check_password(password):
raise AuthenticationFailed('密码错误,请检查后再试', code='wrong_password')
refresh = RefreshToken.for_user(user)
update_last_login(None, user)
return {
'refresh': str(refresh),
'access': str(refresh.access_token),
'user': UserSerializer(user).data
}
class MyTokenObtainPairView(TokenObtainPairView):
"""
@author: xujl
Api说明: 标准用户认证授权(JWT)接口视图验证普通用户账号密码并下发访问凭证
"""
serializer_class = MyTokenObtainPairSerializer
User = get_user_model()
class UserViewSet(viewsets.ModelViewSet):
"""
@author: xujl
Api说明: 平台用户中心接口视图用于用户的增删改查信息完善封禁及状态管理以及管理员的全局管控
"""
serializer_class = UserSerializer
permission_classes = [drf_permissions.IsAuthenticated]
# Custom attributes for HasAPIPermission check
required_permissions = {
'GET': 'api:users:read',
'POST': 'api:users:write',
'PUT': 'api:users:write',
'PATCH': 'api:users:write',
'DELETE': 'api:users:delete'
}
def get_queryset(self):
from django.db.models import Q
from tasks.models import Task, TaskApplication, TaskStatus, ApplyStatus
queryset = User.objects.filter(is_deleted=False)
# Non-admins can only see OPC users
if not (self.request.user.is_staff or self.request.user.is_superuser):
queryset = queryset.filter(user_roles__role__code='OPC_USER')
role = self.request.query_params.get('role')
username = self.request.query_params.get('username')
nickname = self.request.query_params.get('nickname')
real_name = self.request.query_params.get('real_name')
phone = self.request.query_params.get('phone')
email = self.request.query_params.get('email')
status_filter = self.request.query_params.get('status')
idle = self.request.query_params.get('idle')
if role:
queryset = queryset.filter(user_roles__role__code=role)
if username:
queryset = queryset.filter(username__icontains=username)
if nickname:
queryset = queryset.filter(nickname__icontains=nickname)
if real_name:
queryset = queryset.filter(opc_certifications__real_name__icontains=real_name)
if phone:
queryset = queryset.filter(phone__icontains=phone)
if email:
queryset = queryset.filter(email__icontains=email)
if status_filter == 'active':
queryset = queryset.filter(is_active=True)
elif status_filter == 'disabled':
queryset = queryset.filter(is_active=False)
if idle == 'true':
active_app_users = TaskApplication.objects.filter(
status__in=[ApplyStatus.PENDING, ApplyStatus.APPROVED, ApplyStatus.DELIVERED]
).values_list('applicant_id', flat=True)
active_task_publishers = Task.objects.filter(
status__in=[TaskStatus.OPEN, TaskStatus.IN_PROGRESS]
).values_list('publisher_id', flat=True)
queryset = queryset.exclude(id__in=active_app_users).exclude(id__in=active_task_publishers)
return queryset.order_by('-is_recommended', '-recommend_priority', '-created_at').distinct()
def get_permissions(self):
if self.action in ['me', 'update_profile', 'avatar', 'change_password', 'list', 'retrieve']:
return [drf_permissions.IsAuthenticated()]
if self.action in ['admin_create', 'toggle_status', 'reset_password', 'create', 'update', 'partial_update', 'destroy', 'update_opc_stats']:
return [HasAPIPermission()]
if self.action in ['request_password_reset', 'confirm_password_reset']:
return [drf_permissions.AllowAny()]
return super().get_permissions()
@action(detail=False, methods=['post'], permission_classes=[permissions.IsAdminUser])
def admin_create(self, request):
serializer = AdminUserCreateSerializer(data=request.data)
if serializer.is_valid():
user = serializer.save()
user.set_password(serializer.validated_data['password'])
user.save()
return Response(UserSerializer(user).data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def update(self, request, *args, **kwargs):
partial = kwargs.pop('partial', False)
instance = self.get_object()
serializer = self.get_serializer(instance, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
self.perform_update(serializer)
# Handle explicitly read-only fields for Admin
if request.user.is_staff or request.user.is_superuser:
needs_save = False
for field in ['is_active', 'status', 'avatar_url', 'bio', 'location', 'face_url']:
if field in request.data:
setattr(instance, field, request.data[field])
needs_save = True
if needs_save:
instance.save()
# Handle Role Assignments inside the big form
if 'roles' in request.data:
role_codes = request.data['roles']
from .models import Role, UserRole
UserRole.objects.filter(user=instance).delete()
for code in role_codes:
role = Role.objects.filter(code=code).first()
if role:
UserRole.objects.create(user=instance, role=role, granted_by=request.user)
# Handle enterprise_info updates
enterprise_info = request.data.get('enterprise_info')
if enterprise_info and 'ENTERPRISE' in instance.roles:
from .models import Enterprise
ent, created = Enterprise.objects.get_or_create(user=instance, defaults={'status': 'VERIFIED'})
ent.company_name = enterprise_info.get('company_name', ent.company_name)
ent.credit_code = enterprise_info.get('credit_code', ent.credit_code)
ent.business_license = enterprise_info.get('business_license', ent.business_license)
ent.logo_url = enterprise_info.get('logo_url', ent.logo_url)
ent.contact_name = enterprise_info.get('contact_name', ent.contact_name)
ent.contact_phone = enterprise_info.get('contact_phone', ent.contact_phone)
ent.contact_email = enterprise_info.get('contact_email', ent.contact_email)
ent.address = enterprise_info.get('address', ent.address)
ent.description = enterprise_info.get('description', ent.description)
ent.save()
# Handle opc_certification updates
opc_certification = request.data.get('opc_certification')
if opc_certification and 'OPC_USER' in instance.roles:
from opc_cert.models import OpcCertification
cert, created = OpcCertification.objects.get_or_create(user=instance, defaults={'status': 'APPROVED'})
cert.real_name = opc_certification.get('real_name', cert.real_name)
cert.id_card = opc_certification.get('id_card', cert.id_card)
cert.experience = opc_certification.get('experience', cert.experience)
cert.resume_url = opc_certification.get('resume_url', cert.resume_url)
cert.attachments = opc_certification.get('attachments', cert.attachments)
cert.skills = opc_certification.get('skills', cert.skills)
cert.status = opc_certification.get('status', cert.status)
if 'rating' in opc_certification:
cert.rating = opc_certification.get('rating')
cert.save()
if 'rating' in opc_certification:
instance.rating = opc_certification.get('rating')
instance.save()
if getattr(instance, '_prefetched_objects_cache', None):
instance._prefetched_objects_cache = {}
return Response(self.get_serializer(instance).data)
@action(detail=False, methods=['post'])
def change_password(self, request):
serializer = ChangePasswordSerializer(data=request.data)
if serializer.is_valid():
user = request.user
if not user.check_password(serializer.validated_data['old_password']):
return Response({'old_password': ['原密码错误']}, status=status.HTTP_400_BAD_REQUEST)
user.set_password(serializer.validated_data['new_password'])
user.save()
return Response({'status': '密码修改成功'})
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@action(detail=False, methods=['post'], permission_classes=[drf_permissions.AllowAny])
def request_password_reset(self, request):
serializer = PasswordResetRequestSerializer(data=request.data)
if serializer.is_valid():
email = serializer.validated_data['email']
user = User.objects.filter(email=email).first()
if user:
# Mock sending code: Generate 6-digit code and store in cache
code = ''.join(random.choices('0123456789', k=6))
cache.set(f'pwd_reset_{email}', code, timeout=300) # 5 minutes
# In production, send via Email/SMS. Here we return it or print it.
print(f"--- MOCK EMAIL --- Password reset code for {email}: {code}")
return Response({'status': '验证码已发送 (开发环境请查看控制台输出)'})
else:
# To prevent email enumeration, still return success
return Response({'status': '验证码已发送 (开发环境请查看控制台输出)'})
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@action(detail=False, methods=['post'], permission_classes=[drf_permissions.AllowAny])
def confirm_password_reset(self, request):
serializer = PasswordResetConfirmSerializer(data=request.data)
if serializer.is_valid():
email = serializer.validated_data['email']
code = serializer.validated_data['verification_code']
cached_code = cache.get(f'pwd_reset_{email}')
if not cached_code or cached_code != code:
return Response({'verification_code': ['验证码无效或已过期']}, status=status.HTTP_400_BAD_REQUEST)
user = User.objects.filter(email=email).first()
if user:
user.set_password(serializer.validated_data['new_password'])
user.save()
cache.delete(f'pwd_reset_{email}')
return Response({'status': '密码重置成功'})
return Response({'detail': '用户不存在'}, status=status.HTTP_404_NOT_FOUND)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@action(detail=True, methods=['post'])
def reset_password(self, request, pk=None):
user = self.get_object()
new_password = request.data.get('password', '123456')
user.set_password(new_password)
user.save()
return Response({'status': '密码已重置'})
@action(detail=False, methods=['get'])
def me(self, request):
serializer = self.get_serializer(request.user)
return Response(serializer.data)
def perform_destroy(self, instance):
from tasks.models import Task, TaskApplication, TaskStatus, ApplyStatus
from django.utils import timezone
force = self.request.query_params.get('force', '').lower() == 'true'
# Check active expert applications
active_apps = TaskApplication.objects.filter(
applicant=instance,
status__in=[ApplyStatus.PENDING, ApplyStatus.APPROVED, ApplyStatus.DELIVERED]
)
# Check active enterprise tasks
active_tasks = Task.objects.filter(
publisher=instance,
status__in=[TaskStatus.OPEN, TaskStatus.IN_PROGRESS]
)
has_blockers = active_apps.exists() or active_tasks.exists()
if has_blockers and not force:
from rest_framework.exceptions import ValidationError
blocker_details = []
if active_apps.exists():
blocker_details.append(f'专家接单 {active_apps.count()}')
if active_tasks.exists():
blocker_details.append(f'发布任务 {active_tasks.count()}')
raise ValidationError(
f'用户 {instance.username} 有正在进行中的任务({"".join(blocker_details)}),无法直接删除。请先取消相关任务或强制删除。'
)
if has_blockers and force:
# Lock tasks instead of cancelling — mark as anomalous
# For tasks where this user is the ONLY active participant, set CANCELLED
# Otherwise keep IN_PROGRESS but lock the user's application
for app in active_apps:
app.status = ApplyStatus.REJECTED
app.reject_reason = f'用户 {instance.nickname or instance.username} 账户已被管理员删除/锁定'
app.save()
# Check if task still has other active participants
other_apps = app.task.applications.filter(
status__in=[ApplyStatus.APPROVED, ApplyStatus.DELIVERED]
).exclude(applicant=instance)
if not other_apps.exists() and app.task.status == TaskStatus.IN_PROGRESS:
app.task.status = TaskStatus.CANCELLED
app.task.cancel_reason = f'唯一执行专家 {instance.nickname or instance.username} 已被系统锁定,任务异常中止'
app.task.cancelled_at = timezone.now()
app.task.save()
active_tasks.update(
status=TaskStatus.CANCELLED,
cancelled_at=timezone.now(),
cancel_reason=f'管理员强制删除用户 {instance.username},系统自动取消关联任务'
)
# Check and cascade close enterprise
enterprise = Enterprise.objects.filter(user=instance, is_deleted=False).first()
if enterprise:
# Close enterprise: soft delete + remove all members
enterprise.is_deleted = True
enterprise.status = 'REJECTED'
enterprise.save()
EnterpriseMember.objects.filter(enterprise=enterprise).delete()
# Cancel enterprise's published tasks
Task.objects.filter(
enterprise=enterprise,
status__in=[TaskStatus.OPEN, TaskStatus.IN_PROGRESS]
).update(
status=TaskStatus.CANCELLED,
cancelled_at=timezone.now(),
cancel_reason=f'企业 {enterprise.company_name} 已被清退,关联任务自动取消'
)
instance.is_deleted = True
instance.is_active = False
# Free up unique fields for re-registration while preserving for audit
deleted_suffix = f'__deleted_{instance.id}'
if instance.phone:
instance.phone = instance.phone + deleted_suffix
if instance.email:
instance.email = instance.email + deleted_suffix
instance.username = instance.username + deleted_suffix
instance.save()
@action(detail=False, methods=['post'], permission_classes=[permissions.IsAdminUser])
def batch_delete(self, request):
user_ids = request.data.get('user_ids', [])
force = request.data.get('force', False)
if not user_ids:
return Response({'detail': '缺少 user_ids'}, status=status.HTTP_400_BAD_REQUEST)
users = User.objects.filter(id__in=user_ids)
from tasks.models import Task, TaskApplication, TaskStatus, ApplyStatus
from django.utils import timezone
blocked_users = []
for user in users:
active_apps = TaskApplication.objects.filter(
applicant=user,
status__in=[ApplyStatus.PENDING, ApplyStatus.APPROVED, ApplyStatus.DELIVERED]
)
active_tasks = Task.objects.filter(
publisher=user,
status__in=[TaskStatus.OPEN, TaskStatus.IN_PROGRESS]
)
if active_apps.exists() or active_tasks.exists():
blocked_users.append({
'id': str(user.id),
'username': user.username,
'nickname': user.nickname or user.username,
'app_count': active_apps.count(),
'task_count': active_tasks.count(),
})
if blocked_users and not force:
return Response({
'detail': f'{len(blocked_users)} 个用户有正在进行中的任务,无法直接批量删除。',
'blocked_users': blocked_users,
'has_blockers': True,
}, status=status.HTTP_400_BAD_REQUEST)
if blocked_users and force:
for user in users:
TaskApplication.objects.filter(
applicant=user,
status__in=[ApplyStatus.PENDING, ApplyStatus.APPROVED, ApplyStatus.DELIVERED]
).update(status=ApplyStatus.WITHDRAWN)
Task.objects.filter(
publisher=user,
status__in=[TaskStatus.OPEN, TaskStatus.IN_PROGRESS]
).update(
status=TaskStatus.CANCELLED,
cancelled_at=timezone.now(),
cancel_reason=f'管理员批量强制删除,系统自动取消关联任务'
)
for user in users:
user.is_deleted = True
user.is_active = False
deleted_suffix = f'__deleted_{user.id}'
if user.phone:
user.phone = user.phone + deleted_suffix
if user.email:
user.email = user.email + deleted_suffix
user.username = user.username + deleted_suffix
user.save()
return Response({'status': '批量删除成功'})
@action(detail=True, methods=['post'], permission_classes=[permissions.IsAdminUser])
def assign_roles(self, request, pk=None):
user = self.get_object()
role_codes = request.data.get('roles', [])
from .models import Role, UserRole
# 清除旧角色
UserRole.objects.filter(user=user).delete()
# 重新分配
for code in role_codes:
role = Role.objects.filter(code=code).first()
if role:
UserRole.objects.create(user=user, role=role, granted_by=request.user)
return Response({'status': '角色分配成功'})
@action(detail=True, methods=['post'], permission_classes=[permissions.IsAdminUser])
def update_rating(self, request, pk=None):
user = self.get_object()
rating = request.data.get('rating')
if rating is not None:
user.rating = rating
user.save()
return Response({'status': '评分已更新'})
return Response({'detail': '缺少评分参数'}, status=status.HTTP_400_BAD_REQUEST)
@action(detail=True, methods=['get'], permission_classes=[permissions.IsAdminUser])
def active_tasks(self, request, pk=None):
user = self.get_object()
from tasks.models import Task, TaskApplication, TaskStatus, ApplyStatus
results = []
# Expert tasks
expert_apps = TaskApplication.objects.filter(
applicant=user,
status__in=[ApplyStatus.PENDING, ApplyStatus.APPROVED, ApplyStatus.DELIVERED]
)
for app in expert_apps:
results.append({
'id': app.task.id,
'title': app.task.title,
'role': '专家接单',
'status': app.status,
'task_status': app.task.status,
'type': 'application'
})
# Publisher tasks
pub_tasks = Task.objects.filter(
publisher=user,
status__in=[TaskStatus.OPEN, TaskStatus.IN_PROGRESS]
)
for t in pub_tasks:
results.append({
'id': t.id,
'title': t.title,
'role': '任务发布方',
'status': t.status,
'task_status': t.status,
'type': 'task'
})
return Response(results)
@action(detail=False, methods=['put'])
def update_profile(self, request):
user = request.user
serializer = UserSerializer(user, data=request.data, partial=True)
if serializer.is_valid():
serializer.save()
return Response(serializer.data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@action(detail=True, methods=['post'], permission_classes=[permissions.IsAdminUser])
def toggle_status(self, request, pk=None):
user = self.get_object()
user.is_active = not user.is_active
user.save()
return Response({'status': '状态已更新', 'is_active': user.is_active})
@action(detail=True, methods=['put'], permission_classes=[permissions.IsAdminUser])
def update_opc_stats(self, request, pk=None):
user = self.get_object()
rating = request.data.get('rating')
completed_tasks = request.data.get('completed_tasks')
if rating is not None:
user.rating = rating
if completed_tasks is not None:
user.completed_tasks = completed_tasks
user.save()
return Response(UserSerializer(user).data)
@action(detail=True, methods=['post'], permission_classes=[permissions.IsAdminUser])
def toggle_recommend(self, request, pk=None):
user = self.get_object()
user.is_recommended = request.data.get('is_recommended', not user.is_recommended)
user.recommend_priority = request.data.get('recommend_priority', user.recommend_priority)
user.save(update_fields=['is_recommended', 'recommend_priority'])
return Response({'is_recommended': user.is_recommended, 'recommend_priority': user.recommend_priority})
@action(detail=False, methods=['get'], permission_classes=[permissions.IsAdminUser])
def deleted_users(self, request):
"""List soft-deleted users for admin review/recovery."""
deleted = User.objects.filter(is_deleted=True).order_by('-updated_at')
page = self.paginate_queryset(deleted)
if page is not None:
serializer = UserSerializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = UserSerializer(deleted, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'], permission_classes=[permissions.IsAdminUser])
def restore(self, request, pk=None):
"""Restore a soft-deleted user by clearing the deletion suffix."""
try:
user = User.objects.get(id=pk, is_deleted=True)
except User.DoesNotExist:
return Response({'detail': '未找到已删除的用户'}, status=status.HTTP_404_NOT_FOUND)
deleted_suffix = f'__deleted_{user.id}'
# Restore unique fields
if user.username.endswith(deleted_suffix):
original_username = user.username[:-len(deleted_suffix)]
if User.objects.filter(username=original_username, is_deleted=False).exists():
return Response({'detail': f'用户名 {original_username} 已被新用户占用,无法恢复'}, status=status.HTTP_400_BAD_REQUEST)
user.username = original_username
if user.phone and user.phone.endswith(deleted_suffix):
original_phone = user.phone[:-len(deleted_suffix)]
if User.objects.filter(phone=original_phone, is_deleted=False).exists():
return Response({'detail': f'手机号 {original_phone} 已被新用户占用,无法恢复'}, status=status.HTTP_400_BAD_REQUEST)
user.phone = original_phone
if user.email and user.email.endswith(deleted_suffix):
original_email = user.email[:-len(deleted_suffix)]
if User.objects.filter(email=original_email, is_deleted=False).exists():
return Response({'detail': f'邮箱 {original_email} 已被新用户占用,无法恢复'}, status=status.HTTP_400_BAD_REQUEST)
user.email = original_email
user.is_deleted = False
user.is_active = True
user.save()
return Response({'status': f'用户 {user.username} 已成功恢复'})
@action(detail=False, methods=['post'])
def avatar(self, request):
# In a real app, handle file upload to MinIO
# For now, we expect a URL or just mock it
user = request.user
avatar_url = request.data.get('avatar_url')
if avatar_url:
user.avatar_url = avatar_url
user.save()
return Response({'status': '头像已更新', 'avatar_url': avatar_url})
return Response({'detail': '缺少 avatar_url'}, status=status.HTTP_400_BAD_REQUEST)
class RegisterView(generics.CreateAPIView):
"""
@author: xujl
Api说明: 平台普通用户注册接口视图提供基础的账号密码/手机/邮箱注册流程
"""
queryset = User.objects.all()
permission_classes = [permissions.AllowAny]
serializer_class = RegisterSerializer
class EnterpriseViewSet(viewsets.ModelViewSet):
"""
@author: xujl
Api说明: 企业资质及账号管理接口视图企业用户提交工商资料认证基本信息更新管理员对企业入驻进行审核
"""
queryset = Enterprise.objects.filter(is_deleted=False).order_by('-created_at')
permission_classes = [permissions.IsAuthenticated]
def get_serializer_class(self):
if self.action == 'create' and (self.request.user.is_staff or self.request.user.is_superuser):
from .serializers import AdminEnterpriseCreateSerializer
return AdminEnterpriseCreateSerializer
from .serializers import EnterpriseSerializer
return EnterpriseSerializer
required_permissions = {
'GET': 'api:enterprises:read',
'POST': 'api:enterprises:write',
'PUT': 'api:enterprises:write',
'PATCH': 'api:enterprises:write',
'DELETE': 'api:enterprises:delete'
}
def get_permissions(self):
if self.action in ['me', 'create', 'update', 'partial_update']:
return [drf_permissions.IsAuthenticated()]
if self.action in ['list', 'retrieve', 'verify_status']:
return [HasAPIPermission()]
return super().get_permissions()
@action(detail=False, methods=['get'])
def me(self, request):
try:
# First check if user is the enterprise owner
enterprise = Enterprise.objects.filter(user=request.user).first()
role = 'ADMIN'
if not enterprise:
# If not owner, check if user is a member
member = EnterpriseMember.objects.filter(user=request.user).first()
if member:
enterprise = member.enterprise
role = member.role
else:
return Response({'detail': '未找到企业信息'}, status=status.HTTP_404_NOT_FOUND)
serializer = self.get_serializer(enterprise)
data = serializer.data
data['my_role'] = role
return Response(data)
except Exception as e:
return Response({'detail': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def perform_create(self, serializer):
if self.request.user.is_staff or self.request.user.is_superuser:
serializer.save()
else:
enterprise = serializer.save(user=self.request.user)
# Auto assign ENTERPRISE role
from .models import Role, UserRole
role, created = Role.objects.get_or_create(code='ENTERPRISE', defaults={'name': '企业用户'})
UserRole.objects.get_or_create(user=self.request.user, role=role)
def _check_admin_permission(self, request, enterprise):
if request.user.is_superuser or request.user.is_staff:
return True
if request.user == enterprise.user:
return True
member = EnterpriseMember.objects.filter(enterprise=enterprise, user=request.user).first()
return member and member.role == 'ADMIN'
def update(self, request, *args, **kwargs):
enterprise = self.get_object()
if not self._check_admin_permission(request, enterprise):
return Response({'detail': '只有企业管理员可以编辑企业数据'}, status=status.HTTP_403_FORBIDDEN)
return super().update(request, *args, **kwargs)
def partial_update(self, request, *args, **kwargs):
enterprise = self.get_object()
if not self._check_admin_permission(request, enterprise):
return Response({'detail': '只有企业管理员可以编辑企业数据'}, status=status.HTTP_403_FORBIDDEN)
return super().partial_update(request, *args, **kwargs)
def perform_destroy(self, instance):
"""Cascade enterprise deletion: soft-delete, remove members, cancel active tasks, free email."""
from tasks.models import Task, TaskStatus
from django.utils import timezone
# Cancel all active tasks for this enterprise
Task.objects.filter(
enterprise=instance,
status__in=[TaskStatus.OPEN, TaskStatus.IN_PROGRESS]
).update(
status=TaskStatus.CANCELLED,
cancelled_at=timezone.now(),
cancel_reason=f'企业 {instance.company_name} 已被清退,项目中止'
)
# Remove all team members
EnterpriseMember.objects.filter(enterprise=instance).delete()
# Free up the enterprise user's email for re-use (suffix pattern)
owner = instance.user
suffix = f'__deleted_{str(instance.id)[:8]}'
if owner.email and '__deleted_' not in owner.email:
owner.email = owner.email + suffix
owner.save(update_fields=['email'])
# Free up credit_code
if instance.credit_code and '__deleted_' not in instance.credit_code:
instance.credit_code = instance.credit_code + suffix
# Soft delete enterprise
instance.is_deleted = True
instance.status = 'REJECTED'
instance.save()
@action(detail=False, methods=['get'])
def deleted_enterprises(self, request):
"""List soft-deleted enterprises for the recycle bin."""
deleted = Enterprise.objects.filter(is_deleted=True).order_by('-updated_at')
serializer = self.get_serializer(deleted, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def restore(self, request, pk=None):
"""Restore a soft-deleted enterprise."""
enterprise = Enterprise.objects.filter(pk=pk, is_deleted=True).first()
if not enterprise:
return Response({'detail': '未找到已删除的企业'}, status=status.HTTP_404_NOT_FOUND)
# Restore credit_code
if enterprise.credit_code and '__deleted_' in enterprise.credit_code:
original_code = enterprise.credit_code.split('__deleted_')[0]
if Enterprise.objects.filter(credit_code=original_code, is_deleted=False).exists():
return Response({'detail': f'统一社会信用代码 {original_code} 已被其他企业占用'}, status=status.HTTP_409_CONFLICT)
enterprise.credit_code = original_code
# Restore owner email
owner = enterprise.user
if owner.email and '__deleted_' in owner.email:
original_email = owner.email.split('__deleted_')[0]
if User.objects.filter(email=original_email, is_deleted=False).exclude(pk=owner.pk).exists():
return Response({'detail': f'邮箱 {original_email} 已被其他用户占用,无法恢复'}, status=status.HTTP_409_CONFLICT)
owner.email = original_email
owner.save(update_fields=['email'])
enterprise.is_deleted = False
enterprise.status = 'VERIFIED'
enterprise.save()
return Response({'detail': '企业已恢复'})
@action(detail=True, methods=['post'])
def verify_status(self, request, pk=None):
enterprise = self.get_object()
status_val = request.data.get('status')
if status_val not in [choice[0] for choice in Enterprise.EnterpriseStatus.choices]:
return Response({'detail': '无效的状态'}, status=status.HTTP_400_BAD_REQUEST)
enterprise.status = status_val
enterprise.save()
return Response({'status': '企业审核状态已更新', 'enterprise_status': enterprise.status})
class EnterpriseRegisterView(generics.CreateAPIView):
"""
@author: xujl
Api说明: 企业级账号入驻/注册接口视图提交企业信用代码和法人代表等认证资料
"""
serializer_class = EnterpriseRegisterSerializer
permission_classes = [permissions.AllowAny]
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
serializer.save()
return Response({'status': '企业注册成功'}, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class EnterpriseMemberViewSet(viewsets.ModelViewSet):
"""
@author: xujl
Api说明: 企业内部成员管理接口视图用于给企业内部关联多个员工账号并分配角色管理员普通成员
"""
serializer_class = EnterpriseMemberSerializer
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
if self.request.user.is_staff or self.request.user.is_superuser:
enterprise_id = self.request.query_params.get('enterprise')
if enterprise_id:
return EnterpriseMember.objects.filter(enterprise_id=enterprise_id).order_by('-joined_at')
return EnterpriseMember.objects.all().order_by('-joined_at')
try:
enterprise = Enterprise.objects.get(user=self.request.user)
return EnterpriseMember.objects.filter(enterprise=enterprise).order_by('-joined_at')
except Enterprise.DoesNotExist:
return EnterpriseMember.objects.none()
def perform_create(self, serializer):
enterprise = Enterprise.objects.get(user=self.request.user)
serializer.save(enterprise=enterprise)
@action(detail=False, methods=['post'], permission_classes=[permissions.IsAuthenticated])
def add_member(self, request):
serializer = EnterpriseMemberAddSerializer(data=request.data, context={'request': request})
if serializer.is_valid():
member = serializer.save()
return Response(EnterpriseMemberSerializer(member).data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@action(detail=False, methods=['post'], permission_classes=[permissions.IsAdminUser])
def batch_delete(self, request):
member_ids = request.data.get('member_ids', [])
if not member_ids:
return Response({'detail': '缺少 member_ids'}, status=status.HTTP_400_BAD_REQUEST)
EnterpriseMember.objects.filter(id__in=member_ids).delete()
return Response({'status': '批量删除成功'})
@action(detail=False, methods=['post'])
def invite(self, request):
email = request.data.get('email')
role = request.data.get('role', 'MEMBER')
try:
enterprise = Enterprise.objects.get(user=request.user)
user = User.objects.get(email=email)
if EnterpriseMember.objects.filter(enterprise=enterprise, user=user).exists():
return Response({'detail': '该用户已在团队中'}, status=status.HTTP_400_BAD_REQUEST)
member = EnterpriseMember.objects.create(
enterprise=enterprise,
user=user,
role=role
)
return Response(EnterpriseMemberSerializer(member).data, status=status.HTTP_201_CREATED)
except Enterprise.DoesNotExist:
return Response({'detail': '未找到企业信息'}, status=status.HTTP_404_NOT_FOUND)
except User.DoesNotExist:
return Response({'detail': '该用户尚未注册平台,请先引导其注册'}, status=status.HTTP_404_NOT_FOUND)
class RoleViewSet(viewsets.ModelViewSet):
"""
@author: xujl
Api说明: RBAC 角色管理接口视图用于定义平台角色类别以及为用户进行角色的授予和回收
"""
queryset = Role.objects.all().order_by('sort_order', 'created_at')
serializer_class = RoleSerializer
pagination_class = None # Return all roles without pagination
permission_classes = [HasAPIPermission]
required_permissions = {
'GET': 'menu:account:roles',
'POST': 'menu:account:roles',
'PUT': 'menu:account:roles',
'DELETE': 'menu:account:roles'
}
@action(detail=True, methods=['post'])
def assign_permissions(self, request, pk=None):
role = self.get_object()
permission_ids = request.data.get('permissions', [])
from .models import Permission, RolePermission
RolePermission.objects.filter(role=role).delete()
for pid in permission_ids:
perm = Permission.objects.filter(id=pid).first()
if perm:
RolePermission.objects.create(role=role, permission=perm)
return Response({'status': '权限分配成功'})
class PermissionViewSet(viewsets.ModelViewSet):
"""
@author: xujl
Api说明: RBAC 权限节点管理接口视图用于查询系统的路由菜单及按钮级别的权限树结构
"""
queryset = Permission.objects.all().order_by('sort_order', 'created_at')
serializer_class = PermissionSerializer
pagination_class = None # Return all permissions without pagination
permission_classes = [HasAPIPermission]
required_permissions = {
'GET': 'menu:account:permissions',
'POST': 'menu:account:permissions',
'PUT': 'menu:account:permissions',
'DELETE': 'menu:account:permissions'
}