Source code for deltachat_rpc_client.account

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, Optional, Union
from warnings import warn

from ._utils import AttrDict, futuremethod
from .chat import Chat
from .const import ChatlistFlag, ContactFlag, EventType, SpecialContactId
from .contact import Contact
from .message import Message

    from .deltachat import DeltaChat
    from .rpc import Rpc

[docs] @dataclass class Account: """Delta Chat account.""" manager: "DeltaChat" id: int @property def _rpc(self) -> "Rpc": return self.manager.rpc
[docs] def wait_for_event(self) -> AttrDict: """Wait until the next event and return it.""" return AttrDict(self._rpc.wait_for_event(
[docs] def clear_all_events(self): """Removes all queued-up events for a given account. Useful for tests.""" self._rpc.clear_all_events(
[docs] def remove(self) -> None: """Remove the account.""" self._rpc.remove_account(
[docs] def start_io(self) -> None: """Start the account I/O.""" self._rpc.start_io(
[docs] def stop_io(self) -> None: """Stop the account I/O.""" self._rpc.stop_io(
[docs] def get_info(self) -> AttrDict: """Return dictionary of this account configuration parameters.""" return AttrDict(self._rpc.get_info(
[docs] def get_size(self) -> int: """Get the combined filesize of an account in bytes.""" return self._rpc.get_account_file_size(
[docs] def is_configured(self) -> bool: """Return True if this account is configured.""" return self._rpc.is_configured(
[docs] def set_config(self, key: str, value: Optional[str] = None) -> None: """Set configuration value.""" self._rpc.set_config(, key, value)
[docs] def get_config(self, key: str) -> Optional[str]: """Get configuration value.""" return self._rpc.get_config(, key)
[docs] def update_config(self, **kwargs) -> None: """update config values.""" for key, value in kwargs.items(): self.set_config(key, value)
[docs] def set_avatar(self, img_path: Optional[str] = None) -> None: """Set self avatar. Passing None will discard the currently set avatar. """ self.set_config("selfavatar", img_path)
[docs] def get_avatar(self) -> Optional[str]: """Get self avatar.""" return self.get_config("selfavatar")
def check_qr(self, qr): return self._rpc.check_qr(, qr) def set_config_from_qr(self, qr: str): self._rpc.set_config_from_qr(, qr) @futuremethod def configure(self): """Configure an account.""" yield self._rpc.configure.future(
[docs] def bring_online(self): """Start I/O and wait until IMAP becomes IDLE.""" self.start_io() while True: event = self.wait_for_event() if event.kind == EventType.IMAP_INBOX_IDLE: break
[docs] def create_contact(self, obj: Union[int, str, Contact], name: Optional[str] = None) -> Contact: """Create a new Contact or return an existing one. Calling this method will always result in the same underlying contact id. If there already is a Contact with that e-mail address, it is unblocked and its display name is updated if specified. :param obj: email-address or contact id. :param name: (optional) display name for this contact. """ if isinstance(obj, int): obj = Contact(self, obj) if isinstance(obj, Contact): obj = obj.get_snapshot().address return Contact(self, self._rpc.create_contact(, obj, name))
def create_chat(self, account: "Account") -> Chat: addr = account.get_config("addr") contact = self.create_contact(addr) return contact.create_chat()
[docs] def get_contact_by_id(self, contact_id: int) -> Contact: """Return Contact instance for the given contact ID.""" return Contact(self, contact_id)
[docs] def get_contact_by_addr(self, address: str) -> Optional[Contact]: """Check if an e-mail address belongs to a known and unblocked contact.""" contact_id = self._rpc.lookup_contact_id_by_addr(, address) return contact_id and Contact(self, contact_id)
[docs] def get_blocked_contacts(self) -> list[AttrDict]: """Return a list with snapshots of all blocked contacts.""" contacts = self._rpc.get_blocked_contacts( return [AttrDict(contact=Contact(self, contact["id"]), **contact) for contact in contacts]
[docs] def get_chat_by_contact(self, contact: Union[int, Contact]) -> Optional[Chat]: """Return 1:1 chat for a contact if it exists.""" if isinstance(contact, Contact): assert contact.account == self contact_id = elif isinstance(contact, int): contact_id = contact else: raise ValueError(f"{contact!r} is not a contact") chat_id = self._rpc.get_chat_id_by_contact_id(, contact_id) if chat_id: return Chat(self, chat_id) return None
[docs] def get_contacts( self, query: Optional[str] = None, with_self: bool = False, verified_only: bool = False, snapshot: bool = False, ) -> Union[list[Contact], list[AttrDict]]: """Get a filtered list of contacts. :param query: if a string is specified, only return contacts whose name or e-mail matches query. :param with_self: if True the self-contact is also included if it matches the query. :param only_verified: if True only return verified contacts. :param snapshot: If True return a list of contact snapshots instead of Contact instances. """ flags = 0 if verified_only: flags |= ContactFlag.VERIFIED_ONLY if with_self: flags |= ContactFlag.ADD_SELF if snapshot: contacts = self._rpc.get_contacts(, flags, query) return [AttrDict(contact=Contact(self, contact["id"]), **contact) for contact in contacts] contacts = self._rpc.get_contact_ids(, flags, query) return [Contact(self, contact_id) for contact_id in contacts]
@property def self_contact(self) -> Contact: """This account's identity as a Contact.""" return Contact(self, SpecialContactId.SELF)
[docs] def get_chatlist( self, query: Optional[str] = None, contact: Optional[Contact] = None, archived_only: bool = False, for_forwarding: bool = False, no_specials: bool = False, alldone_hint: bool = False, snapshot: bool = False, ) -> Union[list[Chat], list[AttrDict]]: """Return list of chats. :param query: if a string is specified only chats matching this query are returned. :param contact: if a contact is specified only chats including this contact are returned. :param archived_only: if True only archived chats are returned. :param for_forwarding: if True the chat list is sorted with "Saved messages" at the top and without "Device chat" and contact requests. :param no_specials: if True archive link is not added to the list. :param alldone_hint: if True the "all done hint" special chat will be added to the list as needed. :param snapshot: If True return a list of chat snapshots instead of Chat instances. """ flags = 0 if archived_only: flags |= ChatlistFlag.ARCHIVED_ONLY if for_forwarding: flags |= ChatlistFlag.FOR_FORWARDING if no_specials: flags |= ChatlistFlag.NO_SPECIALS if alldone_hint: flags |= ChatlistFlag.ADD_ALLDONE_HINT entries = self._rpc.get_chatlist_entries(, flags, query, contact and if not snapshot: return [Chat(self, entry) for entry in entries] items = self._rpc.get_chatlist_items_by_entries(, entries) chats = [] for item in items.values(): item["chat"] = Chat(self, item["id"]) chats.append(AttrDict(item)) return chats
[docs] def create_group(self, name: str, protect: bool = False) -> Chat: """Create a new group chat. After creation, the group has only self-contact as member and is in unpromoted state. """ return Chat(self, self._rpc.create_group_chat(, name, protect))
[docs] def get_chat_by_id(self, chat_id: int) -> Chat: """Return the Chat instance with the given ID.""" return Chat(self, chat_id)
[docs] def secure_join(self, qrdata: str) -> Chat: """Continue a Setup-Contact or Verified-Group-Invite protocol started on another device. The function returns immediately and the handshake runs in background, sending and receiving several messages. Subsequent calls of `secure_join()` will abort previous, unfinished handshakes. See for protocol details. :param qrdata: The text of the scanned QR code. """ return Chat(self, self._rpc.secure_join(, qrdata))
[docs] def get_qr_code(self) -> tuple[str, str]: """Get Setup-Contact QR Code text and SVG data. this data needs to be transferred to another Delta Chat account in a second channel, typically used by mobiles with QRcode-show + scan UX. """ return self._rpc.get_chat_securejoin_qr_code_svg(, None)
[docs] def get_message_by_id(self, msg_id: int) -> Message: """Return the Message instance with the given ID.""" return Message(self, msg_id)
[docs] def mark_seen_messages(self, messages: list[Message]) -> None: """Mark the given set of messages as seen.""" self._rpc.markseen_msgs(, [ for msg in messages])
[docs] def delete_messages(self, messages: list[Message]) -> None: """Delete messages (local and remote).""" self._rpc.delete_messages(, [ for msg in messages])
[docs] def get_fresh_messages(self) -> list[Message]: """Return the list of fresh messages, newest messages first. This call is intended for displaying notifications. If you are writing a bot, use `get_fresh_messages_in_arrival_order()` instead, to process oldest messages first. """ fresh_msg_ids = self._rpc.get_fresh_msgs( return [Message(self, msg_id) for msg_id in fresh_msg_ids]
[docs] def get_next_messages(self) -> list[Message]: """Return a list of next messages.""" next_msg_ids = self._rpc.get_next_msgs( return [Message(self, msg_id) for msg_id in next_msg_ids]
[docs] def wait_next_messages(self) -> list[Message]: """Wait for new messages and return a list of them.""" next_msg_ids = self._rpc.wait_next_msgs( return [Message(self, msg_id) for msg_id in next_msg_ids]
[docs] def wait_for_incoming_msg_event(self): """Wait for incoming message event and return it.""" while True: event = self.wait_for_event() if event.kind == EventType.INCOMING_MSG: return event
[docs] def wait_for_incoming_msg(self): """Wait for incoming message and return it. Consumes all events before the next incoming message event.""" return self.get_message_by_id(self.wait_for_incoming_msg_event().msg_id)
def wait_for_securejoin_inviter_success(self): while True: event = self.wait_for_event() if event["kind"] == "SecurejoinInviterProgress" and event["progress"] == 1000: break def wait_for_securejoin_joiner_success(self): while True: event = self.wait_for_event() if event["kind"] == "SecurejoinJoinerProgress" and event["progress"] == 1000: break def wait_for_reactions_changed(self): while True: event = self.wait_for_event() if event.kind == EventType.REACTIONS_CHANGED: return event
[docs] def get_fresh_messages_in_arrival_order(self) -> list[Message]: """Return fresh messages list sorted in the order of their arrival, with ascending IDs.""" warn( "get_fresh_messages_in_arrival_order is deprecated, use get_next_messages instead.", DeprecationWarning, stacklevel=2, ) fresh_msg_ids = sorted(self._rpc.get_fresh_msgs( return [Message(self, msg_id) for msg_id in fresh_msg_ids]
[docs] def export_backup(self, path, passphrase: str = "") -> None: """Export backup.""" self._rpc.export_backup(, str(path), passphrase)
[docs] def import_backup(self, path, passphrase: str = "") -> None: """Import backup.""" self._rpc.import_backup(, str(path), passphrase)
[docs] def export_self_keys(self, path) -> None: """Export keys.""" passphrase = "" # Setting passphrase is currently not supported. self._rpc.export_self_keys(, str(path), passphrase)
[docs] def import_self_keys(self, path) -> None: """Import keys.""" passphrase = "" # Importing passphrase-protected keys is currently not supported. self._rpc.import_self_keys(, str(path), passphrase)