Source code for deltachat_rpc_client.account

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, Optional, Union
from warnings import warn

from ._utils import AttrDict, futuremethod
from .chat import Chat
from .const import ChatlistFlag, ContactFlag, EventType, SpecialContactId
from .contact import Contact
from .message import Message

if TYPE_CHECKING:
    from .deltachat import DeltaChat
    from .rpc import Rpc


[docs] @dataclass class Account: """Delta Chat account.""" manager: "DeltaChat" id: int @property def _rpc(self) -> "Rpc": return self.manager.rpc
[docs] def wait_for_event(self) -> AttrDict: """Wait until the next event and return it.""" return AttrDict(self._rpc.wait_for_event(self.id))
[docs] def clear_all_events(self): """Removes all queued-up events for a given account. Useful for tests.""" self._rpc.clear_all_events(self.id)
[docs] def remove(self) -> None: """Remove the account.""" self._rpc.remove_account(self.id)
[docs] def start_io(self) -> None: """Start the account I/O.""" self._rpc.start_io(self.id)
[docs] def stop_io(self) -> None: """Stop the account I/O.""" self._rpc.stop_io(self.id)
[docs] def get_info(self) -> AttrDict: """Return dictionary of this account configuration parameters.""" return AttrDict(self._rpc.get_info(self.id))
[docs] def get_size(self) -> int: """Get the combined filesize of an account in bytes.""" return self._rpc.get_account_file_size(self.id)
[docs] def is_configured(self) -> bool: """Return True if this account is configured.""" return self._rpc.is_configured(self.id)
[docs] def set_config(self, key: str, value: Optional[str] = None) -> None: """Set configuration value.""" self._rpc.set_config(self.id, key, value)
[docs] def get_config(self, key: str) -> Optional[str]: """Get configuration value.""" return self._rpc.get_config(self.id, key)
[docs] def update_config(self, **kwargs) -> None: """update config values.""" for key, value in kwargs.items(): self.set_config(key, value)
[docs] def set_avatar(self, img_path: Optional[str] = None) -> None: """Set self avatar. Passing None will discard the currently set avatar. """ self.set_config("selfavatar", img_path)
[docs] def get_avatar(self) -> Optional[str]: """Get self avatar.""" return self.get_config("selfavatar")
def check_qr(self, qr): return self._rpc.check_qr(self.id, qr) def set_config_from_qr(self, qr: str): self._rpc.set_config_from_qr(self.id, qr) @futuremethod def configure(self): """Configure an account.""" yield self._rpc.configure.future(self.id)
[docs] def bring_online(self): """Start I/O and wait until IMAP becomes IDLE.""" self.start_io() while True: event = self.wait_for_event() if event.kind == EventType.IMAP_INBOX_IDLE: break
[docs] def create_contact(self, obj: Union[int, str, Contact], name: Optional[str] = None) -> Contact: """Create a new Contact or return an existing one. Calling this method will always result in the same underlying contact id. If there already is a Contact with that e-mail address, it is unblocked and its display name is updated if specified. :param obj: email-address or contact id. :param name: (optional) display name for this contact. """ if isinstance(obj, int): obj = Contact(self, obj) if isinstance(obj, Contact): obj = obj.get_snapshot().address return Contact(self, self._rpc.create_contact(self.id, obj, name))
def create_chat(self, account: "Account") -> Chat: addr = account.get_config("addr") contact = self.create_contact(addr) return contact.create_chat()
[docs] def get_contact_by_id(self, contact_id: int) -> Contact: """Return Contact instance for the given contact ID.""" return Contact(self, contact_id)
[docs] def get_contact_by_addr(self, address: str) -> Optional[Contact]: """Check if an e-mail address belongs to a known and unblocked contact.""" contact_id = self._rpc.lookup_contact_id_by_addr(self.id, address) return contact_id and Contact(self, contact_id)
[docs] def get_blocked_contacts(self) -> list[AttrDict]: """Return a list with snapshots of all blocked contacts.""" contacts = self._rpc.get_blocked_contacts(self.id) return [AttrDict(contact=Contact(self, contact["id"]), **contact) for contact in contacts]
[docs] def get_chat_by_contact(self, contact: Union[int, Contact]) -> Optional[Chat]: """Return 1:1 chat for a contact if it exists.""" if isinstance(contact, Contact): assert contact.account == self contact_id = contact.id elif isinstance(contact, int): contact_id = contact else: raise ValueError(f"{contact!r} is not a contact") chat_id = self._rpc.get_chat_id_by_contact_id(self.id, contact_id) if chat_id: return Chat(self, chat_id) return None
[docs] def get_contacts( self, query: Optional[str] = None, with_self: bool = False, verified_only: bool = False, snapshot: bool = False, ) -> Union[list[Contact], list[AttrDict]]: """Get a filtered list of contacts. :param query: if a string is specified, only return contacts whose name or e-mail matches query. :param with_self: if True the self-contact is also included if it matches the query. :param only_verified: if True only return verified contacts. :param snapshot: If True return a list of contact snapshots instead of Contact instances. """ flags = 0 if verified_only: flags |= ContactFlag.VERIFIED_ONLY if with_self: flags |= ContactFlag.ADD_SELF if snapshot: contacts = self._rpc.get_contacts(self.id, flags, query) return [AttrDict(contact=Contact(self, contact["id"]), **contact) for contact in contacts] contacts = self._rpc.get_contact_ids(self.id, flags, query) return [Contact(self, contact_id) for contact_id in contacts]
@property def self_contact(self) -> Contact: """This account's identity as a Contact.""" return Contact(self, SpecialContactId.SELF)
[docs] def get_chatlist( self, query: Optional[str] = None, contact: Optional[Contact] = None, archived_only: bool = False, for_forwarding: bool = False, no_specials: bool = False, alldone_hint: bool = False, snapshot: bool = False, ) -> Union[list[Chat], list[AttrDict]]: """Return list of chats. :param query: if a string is specified only chats matching this query are returned. :param contact: if a contact is specified only chats including this contact are returned. :param archived_only: if True only archived chats are returned. :param for_forwarding: if True the chat list is sorted with "Saved messages" at the top and without "Device chat" and contact requests. :param no_specials: if True archive link is not added to the list. :param alldone_hint: if True the "all done hint" special chat will be added to the list as needed. :param snapshot: If True return a list of chat snapshots instead of Chat instances. """ flags = 0 if archived_only: flags |= ChatlistFlag.ARCHIVED_ONLY if for_forwarding: flags |= ChatlistFlag.FOR_FORWARDING if no_specials: flags |= ChatlistFlag.NO_SPECIALS if alldone_hint: flags |= ChatlistFlag.ADD_ALLDONE_HINT entries = self._rpc.get_chatlist_entries(self.id, flags, query, contact and contact.id) if not snapshot: return [Chat(self, entry) for entry in entries] items = self._rpc.get_chatlist_items_by_entries(self.id, entries) chats = [] for item in items.values(): item["chat"] = Chat(self, item["id"]) chats.append(AttrDict(item)) return chats
[docs] def create_group(self, name: str, protect: bool = False) -> Chat: """Create a new group chat. After creation, the group has only self-contact as member and is in unpromoted state. """ return Chat(self, self._rpc.create_group_chat(self.id, name, protect))
[docs] def get_chat_by_id(self, chat_id: int) -> Chat: """Return the Chat instance with the given ID.""" return Chat(self, chat_id)
[docs] def secure_join(self, qrdata: str) -> Chat: """Continue a Setup-Contact or Verified-Group-Invite protocol started on another device. The function returns immediately and the handshake runs in background, sending and receiving several messages. Subsequent calls of `secure_join()` will abort previous, unfinished handshakes. See https://securejoin.delta.chat/ for protocol details. :param qrdata: The text of the scanned QR code. """ return Chat(self, self._rpc.secure_join(self.id, qrdata))
[docs] def get_qr_code(self) -> tuple[str, str]: """Get Setup-Contact QR Code text and SVG data. this data needs to be transferred to another Delta Chat account in a second channel, typically used by mobiles with QRcode-show + scan UX. """ return self._rpc.get_chat_securejoin_qr_code_svg(self.id, None)
[docs] def get_message_by_id(self, msg_id: int) -> Message: """Return the Message instance with the given ID.""" return Message(self, msg_id)
[docs] def mark_seen_messages(self, messages: list[Message]) -> None: """Mark the given set of messages as seen.""" self._rpc.markseen_msgs(self.id, [msg.id for msg in messages])
[docs] def delete_messages(self, messages: list[Message]) -> None: """Delete messages (local and remote).""" self._rpc.delete_messages(self.id, [msg.id for msg in messages])
[docs] def get_fresh_messages(self) -> list[Message]: """Return the list of fresh messages, newest messages first. This call is intended for displaying notifications. If you are writing a bot, use `get_fresh_messages_in_arrival_order()` instead, to process oldest messages first. """ fresh_msg_ids = self._rpc.get_fresh_msgs(self.id) return [Message(self, msg_id) for msg_id in fresh_msg_ids]
[docs] def get_next_messages(self) -> list[Message]: """Return a list of next messages.""" next_msg_ids = self._rpc.get_next_msgs(self.id) return [Message(self, msg_id) for msg_id in next_msg_ids]
[docs] def wait_next_messages(self) -> list[Message]: """Wait for new messages and return a list of them.""" next_msg_ids = self._rpc.wait_next_msgs(self.id) return [Message(self, msg_id) for msg_id in next_msg_ids]
[docs] def wait_for_incoming_msg_event(self): """Wait for incoming message event and return it.""" while True: event = self.wait_for_event() if event.kind == EventType.INCOMING_MSG: return event
[docs] def wait_for_incoming_msg(self): """Wait for incoming message and return it. Consumes all events before the next incoming message event.""" return self.get_message_by_id(self.wait_for_incoming_msg_event().msg_id)
def wait_for_securejoin_inviter_success(self): while True: event = self.wait_for_event() if event["kind"] == "SecurejoinInviterProgress" and event["progress"] == 1000: break def wait_for_securejoin_joiner_success(self): while True: event = self.wait_for_event() if event["kind"] == "SecurejoinJoinerProgress" and event["progress"] == 1000: break def wait_for_reactions_changed(self): while True: event = self.wait_for_event() if event.kind == EventType.REACTIONS_CHANGED: return event
[docs] def get_fresh_messages_in_arrival_order(self) -> list[Message]: """Return fresh messages list sorted in the order of their arrival, with ascending IDs.""" warn( "get_fresh_messages_in_arrival_order is deprecated, use get_next_messages instead.", DeprecationWarning, stacklevel=2, ) fresh_msg_ids = sorted(self._rpc.get_fresh_msgs(self.id)) return [Message(self, msg_id) for msg_id in fresh_msg_ids]
[docs] def export_backup(self, path, passphrase: str = "") -> None: """Export backup.""" self._rpc.export_backup(self.id, str(path), passphrase)
[docs] def import_backup(self, path, passphrase: str = "") -> None: """Import backup.""" self._rpc.import_backup(self.id, str(path), passphrase)
[docs] def export_self_keys(self, path) -> None: """Export keys.""" passphrase = "" # Setting passphrase is currently not supported. self._rpc.export_self_keys(self.id, str(path), passphrase)
[docs] def import_self_keys(self, path) -> None: """Import keys.""" passphrase = "" # Importing passphrase-protected keys is currently not supported. self._rpc.import_self_keys(self.id, str(path), passphrase)