我想要在label studio中进行自定义,以便管理员可以将项目分配给用户,我该如何实现这个?

问题描述 投票:0回答:1

Assign feature to be shown like this in the label studio管理员可以将项目分配给用户。使用复选框管理员可以选择特定项目的用户来分配项目。

这是项目模型

class ProjectOnboardingSteps(models.Model):
    """ """

    DATA_UPLOAD = "DU"
    CONF_SETTINGS = "CF"
    PUBLISH = "PB"
    INVITE_EXPERTS = "IE"

    STEPS_CHOICES = (
        (DATA_UPLOAD, "Import your data"),
        (CONF_SETTINGS, "Configure settings"),
        (PUBLISH, "Publish project"),
        (INVITE_EXPERTS, "Invite collaborators"),
    )

    code = models.CharField(max_length=2, choices=STEPS_CHOICES, null=True)

    title = models.CharField(_('title'), max_length=1000, null=False)
    description = models.TextField(_('description'), null=False)
    order = models.IntegerField(default=0)

    created_at = models.DateTimeField(_('created at'), auto_now_add=True)
    updated_at = models.DateTimeField(_('updated at'), auto_now=True)

    class Meta:
        ordering = ['order']


class ProjectOnboarding(models.Model):
    """ """

    step = models.ForeignKey(ProjectOnboardingSteps, on_delete=models.CASCADE, related_name="po_through")
    project = models.ForeignKey(Project, on_delete=models.CASCADE)

    finished = models.BooleanField(default=False)

    created_at = models.DateTimeField(_('created at'), auto_now_add=True)
    updated_at = models.DateTimeField(_('updated at'), auto_now=True)

    def save(self, *args, **kwargs):
        super(ProjectOnboarding, self).save(*args, **kwargs)
        if ProjectOnboarding.objects.filter(project=self.project, finished=True).count() == 4:
            self.project.skip_onboarding = True
            self.project.save(recalc=False)


class LabelStreamHistory(models.Model):

    user = models.ForeignKey(
        settings.AUTH_USER_MODEL, on_delete=models.CASCADE, related_name='histories', help_text='User ID'
    )  # noqa
    project = models.ForeignKey(Project, on_delete=models.CASCADE, related_name='histories', help_text='Project ID')
    data = models.JSONField(default=list)

    class Meta:
        constraints = [
            models.UniqueConstraint(fields=['user', 'project'], name='unique_history')
        ]


class ProjectMember(models.Model):

    user = models.ForeignKey(
        settings.AUTH_USER_MODEL, on_delete=models.CASCADE, related_name='project_memberships', help_text='User ID'
    )  # noqa
    project = models.ForeignKey(Project, on_delete=models.CASCADE, related_name='members', help_text='Project ID')
    enabled = models.BooleanField(default=True, help_text='Project member is enabled')
    created_at = models.DateTimeField(_('created at'), auto_now_add=True)
    updated_at = models.DateTimeField(_('updated at'), auto_now=True)


class ProjectSummary(models.Model):

    project = AutoOneToOneField(Project, primary_key=True, on_delete=models.CASCADE, related_name='summary')
    created_at = models.DateTimeField(_('created at'), auto_now_add=True, help_text='Creation time')

    # { col1: task_count_with_col1, col2: task_count_with_col2 }
    all_data_columns = JSONField(
        _('all data columns'), null=True, default=dict, help_text='All data columns found in imported tasks'
    )
    # [col1, col2]
    common_data_columns = JSONField(
        _('common data columns'), null=True, default=list, help_text='Common data columns found across imported tasks'
    )
    # { (from_name, to_name, type): annotation_count }
    created_annotations = JSONField(
        _('created annotations'),
        null=True,
        default=dict,
        help_text='Unique annotation types identified by tuple (from_name, to_name, type)',
    )  # noqa
    # { from_name: {label1: task_count_with_label1, label2: task_count_with_label2} }
    created_labels = JSONField(_('created labels'), null=True, default=dict, help_text='Unique labels')
    created_labels_drafts = JSONField(_('created labels in drafts'), null=True, default=dict, help_text='Unique drafts labels')

    def has_permission(self, user):
        user.project = self.project  # link for activity log
        return self.project.has_permission(user)

    def reset(self, tasks_data_based=True):
        if tasks_data_based:
            self.all_data_columns = {}
            self.common_data_columns = []
        self.created_annotations = {}
        self.created_labels = {}
        self.created_labels_drafts = {}
        self.save()

    def update_data_columns(self, tasks):
        common_data_columns = set()
        all_data_columns = dict(self.all_data_columns)
        for task in tasks:
            try:
                task_data = get_attr_or_item(task, 'data')
            except KeyError:
                task_data = task
            task_data_keys = task_data.keys()
            for column in task_data_keys:
                all_data_columns[column] = all_data_columns.get(column, 0) + 1
            if not common_data_columns:
                common_data_columns = set(task_data_keys)
            else:
                common_data_columns &= set(task_data_keys)

        self.all_data_columns = all_data_columns
        if not self.common_data_columns:
            self.common_data_columns = list(sorted(common_data_columns))
        else:
            self.common_data_columns = list(sorted(set(self.common_data_columns) & common_data_columns))
        logger.debug(f'summary.all_data_columns = {self.all_data_columns}')
        logger.debug(f'summary.common_data_columns = {self.common_data_columns}')
        self.save(update_fields=['all_data_columns', 'common_data_columns'])

    def remove_data_columns(self, tasks):
        all_data_columns = dict(self.all_data_columns)
        keys_to_remove = []

        for task in tasks:
            task_data = get_attr_or_item(task, 'data')
            for key in task_data.keys():
                if key in all_data_columns:
                    all_data_columns[key] -= 1
                    if all_data_columns[key] == 0:
                        keys_to_remove.append(key)
                        all_data_columns.pop(key)
        self.all_data_columns = all_data_columns

        if keys_to_remove:
            common_data_columns = list(self.common_data_columns)
            for key in keys_to_remove:
                if key in common_data_columns:
                    common_data_columns.remove(key)
            self.common_data_columns = common_data_columns
        logger.debug(f'summary.all_data_columns = {self.all_data_columns}')
        logger.debug(f'summary.common_data_columns = {self.common_data_columns}')
        self.save(update_fields=['all_data_columns', 'common_data_columns', ])

    def _get_annotation_key(self, result):
        result_type = result.get('type', None)
        if result_type in ('relation', 'pairwise', None):
            return None
        if 'from_name' not in result or 'to_name' not in result:
            logger.error(
                'Unexpected annotation.result format: "from_name" or "to_name" not found',
                extra={'sentry_skip': True},
            )
            return None
        result_from_name = result['from_name']
        key = get_annotation_tuple(result_from_name, result['to_name'], result_type or '')
        return key

    def _get_labels(self, result):
        result_type = result.get('type')
        # DEV-1990 Workaround for Video labels as there are no labels in VideoRectangle tag
        if result_type in ['videorectangle']:
            result_type = 'labels'
        result_value = result['value'].get(result_type)
        if not result_value or not isinstance(result_value, list) or result_type == 'text':
            # Non-list values are not labels. TextArea list values (texts) are not labels too.
            return []
        # Labels are stored in list
        labels = []
        for label in result_value:
            if result_type == 'taxonomy' and isinstance(label, list):
                for label_ in label:
                    labels.append(str(label_))
            else:
                labels.append(str(label))
        return labels

    def update_created_annotations_and_labels(self, annotations):
        created_annotations = dict(self.created_annotations)
        labels = dict(self.created_labels)
        for annotation in annotations:
            results = get_attr_or_item(annotation, 'result') or []
            if not isinstance(results, list):
                continue

            for result in results:
                # aggregate annotation types
                key = self._get_annotation_key(result)
                if not key:
                    continue
                created_annotations[key] = created_annotations.get(key, 0) + 1
                from_name = result['from_name']

                # aggregate labels
                if from_name not in self.created_labels:
                    labels[from_name] = dict()

                for label in self._get_labels(result):
                    labels[from_name][label] = labels[from_name].get(label, 0) + 1

        logger.debug(f'summary.created_annotations = {created_annotations}')
        logger.debug(f'summary.created_labels = {labels}')
        self.created_annotations = created_annotations
        self.created_labels = labels
        self.save(update_fields=['created_annotations', 'created_labels'])

    def remove_created_annotations_and_labels(self, annotations):
        created_annotations = dict(self.created_annotations)
        labels = dict(self.created_labels)
        for annotation in annotations:
            results = get_attr_or_item(annotation, 'result') or []
            if not isinstance(results, list):
                continue

            for result in results:
                # reduce annotation counters
                key = self._get_annotation_key(result)
                if key in created_annotations:
                    created_annotations[key] -= 1
                    if created_annotations[key] == 0:
                        created_annotations.pop(key)

                # reduce labels counters
                from_name = result.get('from_name', None)
                if from_name not in labels:
                    continue
                for label in self._get_labels(result):
                    label = str(label)
                    if label in labels[from_name]:
                        labels[from_name][label] -= 1
                        if labels[from_name][label] == 0:
                            labels[from_name].pop(label)
                if not labels[from_name]:
                    labels.pop(from_name)
        logger.debug(f'summary.created_annotations = {created_annotations}')
        logger.debug(f'summary.created_labels = {labels}')
        self.created_annotations = created_annotations
        self.created_labels = labels
        self.save(update_fields=['created_annotations', 'created_labels'])

    def update_created_labels_drafts(self, drafts):
        labels = dict(self.created_labels_drafts)
        for draft in drafts:
            results = get_attr_or_item(draft, 'result') or []
            if not isinstance(results, list):
                continue

            for result in results:
                if 'from_name' not in result:
                    continue
                from_name = result['from_name']

                # aggregate labels
                if from_name not in self.created_labels_drafts:
                    labels[from_name] = dict()

                for label in self._get_labels(result):
                    labels[from_name][label] = labels[from_name].get(label, 0) + 1

        logger.debug(f'summary.created_labels = {labels}')
        self.created_labels_drafts = labels
        self.save(update_fields=['created_labels_drafts'])

    def remove_created_drafts_and_labels(self, drafts):
        labels = dict(self.created_labels_drafts)
        for draft in drafts:
            results = get_attr_or_item(draft, 'result') or []
            if not isinstance(results, list):
                continue

            for result in results:
                # reduce labels counters
                from_name = result.get('from_name', None)
                if from_name not in labels:
                    continue
                for label in self._get_labels(result):
                    label = str(label)
                    if label in labels[from_name]:
                        labels[from_name][label] -= 1
                        if labels[from_name][label] == 0:
                            labels[from_name].pop(label)
                if not labels[from_name]:
                    labels.pop(from_name)
        logger.debug(f'summary.created_labels = {labels}')
        self.created_labels_drafts = labels
        self.save(update_fields=['created_labels_drafts'])


class ProjectImport(models.Model):
    class Status(models.TextChoices):
        CREATED = 'created', _('Created')
        IN_PROGRESS = 'in_progress', _('In progress')
        FAILED = 'failed', _('Failed')
        COMPLETED = 'completed', _('Completed')

    project = models.ForeignKey(
        'projects.Project', null=True, related_name='imports', on_delete=models.CASCADE
    )
    preannotated_from_fields = models.JSONField(null=True, blank=True)
    commit_to_project = models.BooleanField(default=False)
    return_task_ids = models.BooleanField(default=False)
    status = models.CharField(max_length=64, choices=Status.choices, default=Status.CREATED)
    url = models.CharField(max_length=2048, null=True, blank=True)
    traceback = models.TextField(null=True, blank=True)
    error = models.TextField(null=True, blank=True)
    created_at = models.DateTimeField(_('created at'), null=True, auto_now_add=True, help_text='Creation time')
    updated_at = models.DateTimeField(_('updated at'), null=True, auto_now_add=True, help_text='Updated time')
    finished_at = models.DateTimeField(_('finished at'), help_text='Complete or fail time', null=True, default=None)
    task_count = models.IntegerField(default=0)
    annotation_count = models.IntegerField(default=0)
    prediction_count = models.IntegerField(default=0)
    duration = models.IntegerField(default=0)
    file_upload_ids = models.JSONField(default=list)
    could_be_tasks_list = models.BooleanField(default=False)
    found_formats = models.JSONField(default=list)
    data_columns = models.JSONField(default=list)
    tasks = models.JSONField(blank=True, null=True)
    task_ids = models.JSONField(default=list)

    def has_permission(self, user):
        return self.project.has_permission(user)


class ProjectReimport(models.Model):
    class Status(models.TextChoices):
        CREATED = 'created', _('Created')
        IN_PROGRESS = 'in_progress', _('In progress')
        FAILED = 'failed', _('Failed')
        COMPLETED = 'completed', _('Completed')

    project = models.ForeignKey(
        'projects.Project', null=True, related_name='reimports', on_delete=models.CASCADE
    )
    status = models.CharField(max_length=64, choices=Status.choices, default=Status.CREATED)
    error = models.TextField(null=True, blank=True)
    task_count = models.IntegerField(default=0)
    annotation_count = models.IntegerField(default=0)
    prediction_count = models.IntegerField(default=0)
    duration = models.IntegerField(default=0)
    file_upload_ids = models.JSONField(default=list)
    files_as_tasks_list = models.BooleanField(default=False)
    found_formats = models.JSONField(default=list)
    data_columns = models.JSONField(default=list)
    traceback = models.TextField(null=True, blank=True)

    def has_permission(self, user):
        return self.project.has_permission(user)

这是用户模型

class UserManager(BaseUserManager):
    use_in_migrations = True

    def _create_user(self, email, password, **extra_fields):
        """
        Create and save a user with the given email and password.
        """
        if not email:
            raise ValueError('Must specify an email address')

        email = self.normalize_email(email)
        user = self.model(email=email, **extra_fields)

        user.set_password(password)
        user.save(using=self._db)

        return user

    def create_user(self, email, password=None, **extra_fields):
        extra_fields.setdefault('is_staff', False)
        extra_fields.setdefault('is_superuser', False)
        return self._create_user(email, password, **extra_fields)

    def create_superuser(self, email, password, **extra_fields):
        extra_fields.setdefault('is_staff', True)
        extra_fields.setdefault('is_superuser', True)

        if extra_fields.get('is_staff') is not True:
            raise ValueError('Superuser must have is_staff=True.')
        if extra_fields.get('is_superuser') is not True:
            raise ValueError('Superuser must have is_superuser=True.')

        return self._create_user(email, password, **extra_fields)


class UserLastActivityMixin(models.Model):
    last_activity = models.DateTimeField(
        _('last activity'), default=timezone.now, editable=False)

    def update_last_activity(self):
        self.last_activity = timezone.now()
        self.save(update_fields=["last_activity"])

    class Meta:
        abstract = True


UserMixin = load_func(settings.USER_MIXIN)


class User(UserMixin, AbstractBaseUser, PermissionsMixin, UserLastActivityMixin):
    """
    An abstract base class implementing a fully featured User model with
    admin-compliant permissions.

    Username and password are required. Other fields are optional.
    """
    username = models.CharField(_('username'), max_length=256)
    email = models.EmailField(_('email address'), unique=True, blank=True)

    first_name = models.CharField(_('first name'), max_length=256, blank=True)
    last_name = models.CharField(_('last name'), max_length=256, blank=True)
    phone = models.CharField(_('phone'), max_length=256, blank=True)
    avatar = models.ImageField(upload_to=hash_upload, blank=True)

    is_staff = models.BooleanField(_('staff status'), default=False,
                                   help_text=_('Designates whether the user can log into this admin site.'))

    is_active = models.BooleanField(_('active'), default=True,
                                    help_text=_('Designates whether to treat this user as active. '
                                                'Unselect this instead of deleting accounts.'))

    date_joined = models.DateTimeField(_('date joined'), default=timezone.now)

    activity_at = models.DateTimeField(_('last annotation activity'), auto_now=True)

    active_organization = models.ForeignKey(
        'organizations.Organization',
        null=True,
        on_delete=models.SET_NULL,
        related_name='active_users'
    )

    allow_newsletters = models.BooleanField(
        _('allow newsletters'),
        null=True,
        default=None,
        help_text=_('Allow sending newsletters to user')
    )

    objects = UserManager()

    EMAIL_FIELD = 'email'
    USERNAME_FIELD = 'email'
    REQUIRED_FIELDS = ()

    class Meta:
        db_table = 'htx_user'
        verbose_name = _('user')
        verbose_name_plural = _('users')
        indexes = [
            models.Index(fields=['username']),
            models.Index(fields=['email']),
            models.Index(fields=['first_name']),
            models.Index(fields=['last_name']),
            models.Index(fields=['date_joined']),
        ]

    @property
    def avatar_url(self):
        if self.avatar:
            if settings.CLOUD_FILE_STORAGE_ENABLED:
                return self.avatar.url
            else:
                return settings.HOSTNAME + self.avatar.url

    def is_organization_admin(self, org_pk):
        return True

    def active_organization_annotations(self):
        return self.annotations.filter(project__organization=self.active_organization)

    def active_organization_contributed_project_number(self):
        annotations = self.active_organization_annotations()
        return annotations.values_list('project').distinct().count()

    @property
    def own_organization(self):
        return Organization.objects.get(created_by=self)

    @property
    def has_organization(self):
        return Organization.objects.filter(created_by=self).exists()

    def clean(self):
        super().clean()
        self.email = self.__class__.objects.normalize_email(self.email)

    def name_or_email(self):
        name = self.get_full_name()
        if len(name) == 0:
            name = self.email

        return name
        
    def get_full_name(self):
        """
        Return the first_name and the last_name for a given user with a space in between.
        """
        full_name = '%s %s' % (self.first_name, self.last_name)
        return full_name.strip()

    def get_short_name(self):
        """Return the short name for the user."""
        return self.first_name

    def reset_token(self):
        token = Token.objects.filter(user=self)
        if token.exists():
            token.delete()
        return Token.objects.create(user=self)
    
    def get_initials(self):
        initials = '?'
        if not self.first_name and not self.last_name:
            initials = self.email[0:2]
        elif self.first_name and not self.last_name:
            initials = self.first_name[0:1]
        elif self.last_name and not self.first_name:
            initials = self.last_name[0:1]
        elif self.first_name and self.last_name:
            initials = self.first_name[0:1] + self.last_name[0:1]
        return initials


@receiver(post_save, sender=User)
def init_user(sender, instance=None, created=False, **kwargs):
    if created:
        # create token for user
        Token.objects.create(user=instance)

我尝试将项目的created_by字段从管理员更改为用户,我认为这将使我有机会向用户展示项目,但我不知道如何实现这个想法。 另外,作为管理员,我想将项目分配给多个用户。

label-studio
1个回答
0
投票

mishra,有任何更新吗? 面临同样的问题

© www.soinside.com 2019 - 2024. All rights reserved.