"""Account module."""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING, Optional, Union
from warnings import warn
from ._utils import AttrDict, futuremethod
from .chat import Chat
from .const import ChatlistFlag, ContactFlag, EventType, SpecialContactId
from .contact import Contact
from .message import Message
if TYPE_CHECKING:
from .deltachat import DeltaChat
from .rpc import Rpc
[docs]
@dataclass
class Account:
"""Delta Chat account."""
manager: "DeltaChat"
id: int
@property
def _rpc(self) -> "Rpc":
return self.manager.rpc
[docs]
def wait_for_event(self, event_type=None) -> AttrDict:
"""Wait until the next event and return it."""
while True:
next_event = AttrDict(self._rpc.wait_for_event(self.id))
if event_type is None or next_event.kind == event_type:
return next_event
[docs]
def clear_all_events(self):
"""Remove all queued-up events for a given account.
Useful for tests.
"""
self._rpc.clear_all_events(self.id)
[docs]
def remove(self) -> None:
"""Remove the account."""
self._rpc.remove_account(self.id)
[docs]
def clone(self) -> "Account":
"""Clone given account.
This uses backup-transfer via iroh, i.e. the 'Add second device' feature.
"""
future = self._rpc.provide_backup.future(self.id)
qr = self._rpc.get_backup_qr(self.id)
new_account = self.manager.add_account()
new_account._rpc.get_backup(new_account.id, qr)
future()
return new_account
[docs]
def start_io(self) -> None:
"""Start the account I/O."""
self._rpc.start_io(self.id)
[docs]
def stop_io(self) -> None:
"""Stop the account I/O."""
self._rpc.stop_io(self.id)
[docs]
def get_info(self) -> AttrDict:
"""Return dictionary of this account configuration parameters."""
return AttrDict(self._rpc.get_info(self.id))
[docs]
def get_size(self) -> int:
"""Get the combined filesize of an account in bytes."""
return self._rpc.get_account_file_size(self.id)
[docs]
def set_config(self, key: str, value: Optional[str] = None) -> None:
"""Set configuration value."""
self._rpc.set_config(self.id, key, value)
[docs]
def get_config(self, key: str) -> Optional[str]:
"""Get configuration value."""
return self._rpc.get_config(self.id, key)
[docs]
def update_config(self, **kwargs) -> None:
"""Update config values."""
for key, value in kwargs.items():
self.set_config(key, value)
[docs]
def set_avatar(self, img_path: Optional[str] = None) -> None:
"""Set self avatar.
Passing None will discard the currently set avatar.
"""
self.set_config("selfavatar", img_path)
[docs]
def get_avatar(self) -> Optional[str]:
"""Get self avatar."""
return self.get_config("selfavatar")
[docs]
def check_qr(self, qr):
"""Parse QR code contents.
This function takes the raw text scanned
and checks what can be done with it.
"""
return self._rpc.check_qr(self.id, qr)
[docs]
def set_config_from_qr(self, qr: str):
"""Set configuration values from a QR code."""
self._rpc.set_config_from_qr(self.id, qr)
@futuremethod
def configure(self):
"""Configure an account."""
yield self._rpc.configure.future(self.id)
@futuremethod
def add_or_update_transport(self, params):
"""Add a new transport."""
yield self._rpc.add_or_update_transport.future(self.id, params)
@futuremethod
def list_transports(self):
"""Return the list of all email accounts that are used as a transport in the current profile."""
transports = yield self._rpc.list_transports.future(self.id)
return transports
[docs]
def bring_online(self):
"""Start I/O and wait until IMAP becomes IDLE."""
self.start_io()
self.wait_for_event(EventType.IMAP_INBOX_IDLE)
[docs]
def make_vcard(self, contacts: list[Contact]) -> str:
"""Create vCard with the given contacts."""
assert all(contact.account == self for contact in contacts)
contact_ids = [contact.id for contact in contacts]
return self._rpc.make_vcard(self.id, contact_ids)
[docs]
def import_vcard(self, vcard: str) -> list[Contact]:
"""Import vCard.
Return created or modified contacts in the order they appear in vCard.
"""
contact_ids = self._rpc.import_vcard_contents(self.id, vcard)
return [Contact(self, contact_id) for contact_id in contact_ids]
[docs]
def create_chat(self, account: "Account") -> Chat:
"""Create a 1:1 chat with another account."""
return self.create_contact(account).create_chat()
[docs]
def get_device_chat(self) -> Chat:
"""Return device chat."""
return self.device_contact.create_chat()
@property
def self_contact(self) -> Contact:
"""Account's identity as a Contact."""
return Contact(self, SpecialContactId.SELF)
@property
def device_contact(self) -> Chat:
"""Account's device contact."""
return Contact(self, SpecialContactId.DEVICE)
[docs]
def get_chatlist(
self,
query: Optional[str] = None,
contact: Optional[Contact] = None,
archived_only: bool = False,
for_forwarding: bool = False,
no_specials: bool = False,
alldone_hint: bool = False,
snapshot: bool = False,
) -> Union[list[Chat], list[AttrDict]]:
"""Return list of chats.
:param query: if a string is specified only chats matching this query are returned.
:param contact: if a contact is specified only chats including this contact are returned.
:param archived_only: if True only archived chats are returned.
:param for_forwarding: if True the chat list is sorted with "Saved messages" at the top
and without "Device chat" and contact requests.
:param no_specials: if True archive link is not added to the list.
:param alldone_hint: if True the "all done hint" special chat will be added to the list
as needed.
:param snapshot: If True return a list of chat snapshots instead of Chat instances.
"""
flags = 0
if archived_only:
flags |= ChatlistFlag.ARCHIVED_ONLY
if for_forwarding:
flags |= ChatlistFlag.FOR_FORWARDING
if no_specials:
flags |= ChatlistFlag.NO_SPECIALS
if alldone_hint:
flags |= ChatlistFlag.ADD_ALLDONE_HINT
entries = self._rpc.get_chatlist_entries(self.id, flags, query, contact and contact.id)
if not snapshot:
return [Chat(self, entry) for entry in entries]
items = self._rpc.get_chatlist_items_by_entries(self.id, entries)
chats = []
for item in items.values():
item["chat"] = Chat(self, item["id"])
chats.append(AttrDict(item))
return chats
[docs]
def create_group(self, name: str, protect: bool = False) -> Chat:
"""Create a new group chat.
After creation,
the group has only self-contact as member one member (see `SpecialContactId.SELF`)
and is in _unpromoted_ state.
This means, you can add or remove members, change the name,
the group image and so on without messages being sent to all group members.
This changes as soon as the first message is sent to the group members
and the group becomes _promoted_.
After that, all changes are synced with all group members
by sending status message.
To check, if a chat is still unpromoted, you can look at the `is_unpromoted` property of a chat
(see `get_full_snapshot()` / `get_basic_snapshot()`).
This may be useful if you want to show some help for just created groups.
:param protect: If set to 1 the function creates group with protection initially enabled.
Only verified members are allowed in these groups
and end-to-end-encryption is always enabled.
"""
return Chat(self, self._rpc.create_group_chat(self.id, name, protect))
[docs]
def create_broadcast(self, name: str) -> Chat:
"""Create a new **broadcast channel**
(called "Channel" in the UI).
Broadcast channels are similar to groups on the sending device,
however, recipients get the messages in a read-only chat
and will not see who the other members are.
Called `broadcast` here rather than `channel`,
because the word "channel" already appears a lot in the code,
which would make it hard to grep for it.
After creation, the chat contains no recipients and is in _unpromoted_ state;
see `create_group()` for more information on the unpromoted state.
Returns the created chat.
"""
return Chat(self, self._rpc.create_broadcast(self.id, name))
[docs]
def get_chat_by_id(self, chat_id: int) -> Chat:
"""Return the Chat instance with the given ID."""
return Chat(self, chat_id)
[docs]
def secure_join(self, qrdata: str) -> Chat:
"""Continue a Setup-Contact or Verified-Group-Invite protocol started on another device.
The function returns immediately and the handshake runs in background, sending
and receiving several messages.
Subsequent calls of `secure_join()` will abort previous, unfinished handshakes.
See https://securejoin.delta.chat/ for protocol details.
:param qrdata: The text of the scanned QR code.
"""
return Chat(self, self._rpc.secure_join(self.id, qrdata))
[docs]
def get_qr_code(self) -> str:
"""Get Setup-Contact QR Code text.
This data needs to be transferred to another Delta Chat account
in a second channel, typically used by mobiles with QRcode-show + scan UX.
"""
return self._rpc.get_chat_securejoin_qr_code(self.id, None)
[docs]
def get_qr_code_svg(self) -> tuple[str, str]:
"""Get Setup-Contact QR code text and SVG."""
return self._rpc.get_chat_securejoin_qr_code_svg(self.id, None)
[docs]
def get_message_by_id(self, msg_id: int) -> Message:
"""Return the Message instance with the given ID."""
return Message(self, msg_id)
[docs]
def mark_seen_messages(self, messages: list[Message]) -> None:
"""Mark the given set of messages as seen."""
self._rpc.markseen_msgs(self.id, [msg.id for msg in messages])
[docs]
def delete_messages(self, messages: list[Message]) -> None:
"""Delete messages (local and remote)."""
self._rpc.delete_messages(self.id, [msg.id for msg in messages])
[docs]
def get_fresh_messages(self) -> list[Message]:
"""Return the list of fresh messages, newest messages first.
This call is intended for displaying notifications.
If you are writing a bot, use `get_fresh_messages_in_arrival_order()` instead,
to process oldest messages first.
"""
fresh_msg_ids = self._rpc.get_fresh_msgs(self.id)
return [Message(self, msg_id) for msg_id in fresh_msg_ids]
[docs]
def get_next_messages(self) -> list[Message]:
"""Return a list of next messages."""
next_msg_ids = self._rpc.get_next_msgs(self.id)
return [Message(self, msg_id) for msg_id in next_msg_ids]
[docs]
def wait_next_messages(self) -> list[Message]:
"""Wait for new messages and return a list of them."""
next_msg_ids = self._rpc.wait_next_msgs(self.id)
return [Message(self, msg_id) for msg_id in next_msg_ids]
[docs]
def wait_for_incoming_msg_event(self):
"""Wait for incoming message event and return it."""
return self.wait_for_event(EventType.INCOMING_MSG)
[docs]
def wait_for_msgs_changed_event(self):
"""Wait for messages changed event and return it."""
return self.wait_for_event(EventType.MSGS_CHANGED)
[docs]
def wait_for_msgs_noticed_event(self):
"""Wait for messages noticed event and return it."""
return self.wait_for_event(EventType.MSGS_NOTICED)
[docs]
def wait_for_incoming_msg(self):
"""Wait for incoming message and return it.
Consumes all events before the next incoming message event.
"""
return self.get_message_by_id(self.wait_for_incoming_msg_event().msg_id)
[docs]
def wait_for_securejoin_inviter_success(self):
"""Wait until SecureJoin process finishes successfully on the inviter side."""
while True:
event = self.wait_for_event()
if event["kind"] == "SecurejoinInviterProgress" and event["progress"] == 1000:
break
[docs]
def wait_for_securejoin_joiner_success(self):
"""Wait until SecureJoin process finishes successfully on the joiner side."""
while True:
event = self.wait_for_event()
if event["kind"] == "SecurejoinJoinerProgress" and event["progress"] == 1000:
break
[docs]
def wait_for_reactions_changed(self):
"""Wait for reaction change event."""
return self.wait_for_event(EventType.REACTIONS_CHANGED)
[docs]
def get_fresh_messages_in_arrival_order(self) -> list[Message]:
"""Return fresh messages list sorted in the order of their arrival, with ascending IDs."""
warn(
"get_fresh_messages_in_arrival_order is deprecated, use get_next_messages instead.",
DeprecationWarning,
stacklevel=2,
)
fresh_msg_ids = sorted(self._rpc.get_fresh_msgs(self.id))
return [Message(self, msg_id) for msg_id in fresh_msg_ids]
[docs]
def export_backup(self, path, passphrase: str = "") -> None:
"""Export backup."""
self._rpc.export_backup(self.id, str(path), passphrase)
[docs]
def import_backup(self, path, passphrase: str = "") -> None:
"""Import backup."""
self._rpc.import_backup(self.id, str(path), passphrase)
[docs]
def export_self_keys(self, path) -> None:
"""Export keys."""
passphrase = "" # Setting passphrase is currently not supported.
self._rpc.export_self_keys(self.id, str(path), passphrase)
[docs]
def import_self_keys(self, path) -> None:
"""Import keys."""
passphrase = "" # Importing passphrase-protected keys is currently not supported.
self._rpc.import_self_keys(self.id, str(path), passphrase)
[docs]
def initiate_autocrypt_key_transfer(self) -> None:
"""Send Autocrypt Setup Message."""
return self._rpc.initiate_autocrypt_key_transfer(self.id)