forked from galaxyproject/galaxy
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrole.py
More file actions
68 lines (56 loc) · 1.96 KB
/
role.py
File metadata and controls
68 lines (56 loc) · 1.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from sqlalchemy import (
and_,
false,
select,
)
from galaxy.model import (
Role,
User,
UserRoleAssociation,
)
from galaxy.model.scoped_session import galaxy_scoped_session
def get_npns_roles(session):
"""
non-private, non-sharing roles
"""
stmt = (
select(Role)
.where(and_(Role.deleted == false(), Role.type != Role.types.PRIVATE, Role.type != Role.types.SHARING))
.order_by(Role.name)
)
return session.scalars(stmt)
def get_private_user_role(user, session):
stmt = (
select(Role)
.where(
and_(
UserRoleAssociation.user_id == user.id,
Role.id == UserRoleAssociation.role_id,
Role.type == Role.types.PRIVATE,
)
)
.distinct()
)
return session.execute(stmt).scalar_one_or_none()
def get_roles_by_ids(session: galaxy_scoped_session, role_ids):
stmt = select(Role).where(Role.id.in_(role_ids))
return session.scalars(stmt).all()
def get_displayable_roles(session, trans_user, user_is_admin, security_agent):
roles = []
stmt = select(Role).where(Role.deleted == false())
for role in session.scalars(stmt):
if user_is_admin or security_agent.ok_to_display(trans_user, role):
roles.append(role)
return roles
def get_private_role_user_emails_dict(session, role_ids: set[int] | None = None) -> dict[int, str]:
"""Return a mapping of private role ids to user emails.
If role_ids is provided, only return mappings for roles in that set,
avoiding a full table scan on large instances.
"""
if role_ids is not None and not role_ids:
return {}
stmt = select(UserRoleAssociation.role_id, User.email).join(Role).join(User).where(Role.type == Role.types.PRIVATE)
if role_ids is not None:
stmt = stmt.where(UserRoleAssociation.role_id.in_(role_ids))
roleid_email_tuples = session.execute(stmt).all()
return dict(roleid_email_tuples)