"""Account class implementation."""
import os
from array import array
from contextlib import contextmanager
from email.utils import parseaddr
from threading import Event
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Union
from . import const, hookspec
from .capi import ffi, lib
from .chat import Chat
from .contact import Contact
from .cutil import (
DCLot,
as_dc_charpointer,
from_dc_charpointer,
from_optional_dc_charpointer,
iter_array,
)
from .message import Message
from .tracker import ConfigureTracker, ImexTracker
if TYPE_CHECKING:
from .events import FFIEventTracker
class MissingCredentials(ValueError):
"""Account is missing `addr` and `mail_pw` config values."""
def get_core_info():
"""get some system info."""
from tempfile import NamedTemporaryFile
with NamedTemporaryFile() as path:
path.close()
return get_dc_info_as_dict(
ffi.gc(
lib.dc_context_new(as_dc_charpointer(""), as_dc_charpointer(path.name), ffi.NULL),
lib.dc_context_unref,
),
)
def get_dc_info_as_dict(dc_context):
lines = from_dc_charpointer(lib.dc_get_info(dc_context))
info_dict = {}
for line in lines.split("\n"):
if not line.strip():
continue
key, value = line.split("=", 1)
info_dict[key.lower()] = value
return info_dict
[docs]
class Account:
"""Each account is tied to a sqlite database file which is fully managed
by the underlying deltachat core library. All public Account methods are
meant to be memory-safe and return memory-safe objects.
"""
MissingCredentials = MissingCredentials
_logid: str
_evtracker: "FFIEventTracker"
def __init__(self, db_path, os_name=None, logging=True, closed=False) -> None:
from .events import EventThread
"""initialize account object.
:param db_path: a path to the account database. The database
will be created if it doesn't exist.
:param os_name: [Deprecated]
:param logging: enable logging for this account
:param closed: set to True to avoid automatically opening the account
after creation.
"""
# initialize per-account plugin system
self._pm = hookspec.PerAccount._make_plugin_manager()
self._logging = logging
self.add_account_plugin(self)
self.db_path = db_path
if hasattr(db_path, "encode"):
db_path = db_path.encode("utf8")
ptr = lib.dc_context_new_closed(db_path) if closed else lib.dc_context_new(ffi.NULL, db_path, ffi.NULL)
if ptr == ffi.NULL:
raise ValueError(f"Could not dc_context_new: {os_name} {db_path}")
self._dc_context = ffi.gc(
ptr,
lib.dc_context_unref,
)
self._shutdown_event = Event()
self._event_thread = EventThread(self)
self._configkeys = self.get_config("sys.config_keys").split()
hook = hookspec.Global._get_plugin_manager().hook
hook.dc_account_init(account=self)
[docs]
def open(self, passphrase: Optional[str] = None) -> bool:
"""Open the account's database with the given passphrase.
This can only be used on a closed account. If the account is new, this
operation sets the database passphrase. For existing databases the passphrase
should be the one used to encrypt the database the first time.
:returns: True if the database is opened with this passphrase, False if the
passphrase is incorrect or an error occurred.
"""
return bool(lib.dc_context_open(self._dc_context, as_dc_charpointer(passphrase)))
[docs]
def disable_logging(self) -> None:
"""disable logging."""
self._logging = False
[docs]
def enable_logging(self) -> None:
"""re-enable logging."""
self._logging = True
def __repr__(self) -> str:
return f"<Account path={self.db_path}>"
# def __del__(self):
# self.shutdown()
def log(self, msg):
if self._logging:
self._pm.hook.ac_log_line(message=msg)
def _check_config_key(self, name: str) -> None:
if name not in self._configkeys:
raise KeyError(f"{name!r} not a valid config key, existing keys: {self._configkeys!r}")
[docs]
def get_info(self) -> Dict[str, str]:
"""return dictionary of built config parameters."""
return get_dc_info_as_dict(self._dc_context)
def dump_account_info(self, logfile):
def log(*args, **kwargs):
kwargs["file"] = logfile
print(*args, **kwargs)
log("=============== " + self.get_config("displayname") + " ===============")
cursor = 0
for name, val in self.get_info().items():
entry = f"{name.upper()}={val}"
if cursor + len(entry) > 80:
log("")
cursor = 0
log(entry, end=" ")
cursor += len(entry) + 1
log("")
[docs]
def set_stock_translation(self, id: int, string: str) -> None:
"""set stock translation string.
:param id: id of stock string (const.DC_STR_*)
:param value: string to set as new translation
:returns: None
"""
bytestring = string.encode("utf8")
res = lib.dc_set_stock_translation(self._dc_context, id, bytestring)
if res == 0:
raise ValueError("could not set translation string")
[docs]
def set_config(self, name: str, value: Optional[str]) -> None:
"""set configuration values.
:param name: config key name (unicode)
:param value: value to set (unicode)
:returns: None
"""
self._check_config_key(name)
namebytes = name.encode("utf8")
if isinstance(value, (int, bool)):
value = str(int(value))
valuebytes = value.encode("utf8") if value is not None else ffi.NULL
lib.dc_set_config(self._dc_context, namebytes, valuebytes)
[docs]
def get_config(self, name: str) -> str:
"""return unicode string value.
:param name: configuration key to lookup (eg "addr" or "mail_pw")
:returns: unicode value
:raises: KeyError if no config value was found.
"""
if name != "sys.config_keys":
self._check_config_key(name)
namebytes = name.encode("utf8")
res = lib.dc_get_config(self._dc_context, namebytes)
assert res != ffi.NULL, f"config value not found for: {name!r}"
return from_dc_charpointer(res)
def _preconfigure_keypair(self, secret: str) -> None:
"""See dc_preconfigure_keypair() in deltachat.h.
In other words, you don't need this.
"""
res = lib.dc_preconfigure_keypair(
self._dc_context,
as_dc_charpointer(secret),
)
if res == 0:
raise Exception("Failed to set key")
[docs]
def update_config(self, kwargs: Dict[str, Any]) -> None:
"""update config values.
:param kwargs: name=value config settings for this account.
values need to be unicode.
:returns: None
"""
for key, value in kwargs.items():
self.set_config(key, value)
[docs]
def is_open(self) -> bool:
"""Determine if account is open.
:returns True if account is open.
"""
return bool(lib.dc_context_is_open(self._dc_context))
[docs]
def set_avatar(self, img_path: Optional[str]) -> None:
"""Set self avatar.
:raises ValueError: if profile image could not be set
:returns: None
"""
if img_path is None:
self.set_config("selfavatar", None)
else:
assert os.path.exists(img_path), img_path
self.set_config("selfavatar", img_path)
[docs]
def get_latest_backupfile(self, backupdir) -> Optional[str]:
"""return the latest backup file in a given directory."""
res = lib.dc_imex_has_backup(self._dc_context, as_dc_charpointer(backupdir))
return from_optional_dc_charpointer(res)
[docs]
def get_blobdir(self) -> str:
"""return the directory for files.
All sent files are copied to this directory if necessary.
Place files there directly to avoid copying.
"""
return from_dc_charpointer(lib.dc_get_blobdir(self._dc_context))
def get_contact(self, obj) -> Optional[Contact]:
if isinstance(obj, Contact):
return obj
(_, addr) = self.get_contact_addr_and_name(obj)
return self.get_contact_by_addr(addr)
def get_contact_addr_and_name(self, obj, name: Optional[str] = None):
if isinstance(obj, Account):
if not obj.is_configured():
raise ValueError("can only add addresses from configured accounts")
addr, displayname = obj.get_config("addr"), obj.get_config("displayname")
elif isinstance(obj, Contact):
if obj.account != self:
raise ValueError(f"account mismatch {obj}")
addr, displayname = obj.addr, obj.name
elif isinstance(obj, str):
displayname, addr = parseaddr(obj)
else:
raise TypeError(f"don't know how to create chat for {obj!r}")
if name is None and displayname:
name = displayname
return (name, addr)
[docs]
def get_fresh_messages(self) -> Generator[Message, None, None]:
"""yield all fresh messages from all chats."""
dc_array = ffi.gc(lib.dc_get_fresh_msgs(self._dc_context), lib.dc_array_unref)
return (x for x in iter_array(dc_array, lambda x: Message.from_db(self, x)) if x is not None)
[docs]
def create_chat(self, obj) -> Chat:
"""Create a 1:1 chat with Account, Contact or e-mail address."""
return self.create_contact(obj).create_chat()
[docs]
def create_group_chat(
self,
name: str,
contacts: Optional[List[Contact]] = None,
verified: bool = False,
) -> Chat:
"""create a new group chat object.
Chats are unpromoted until the first message is sent.
:param contacts: list of contacts to add
:param verified: if true only verified contacts can be added.
:returns: a :class:`deltachat.chat.Chat` object.
"""
bytes_name = name.encode("utf8")
chat_id = lib.dc_create_group_chat(self._dc_context, int(verified), bytes_name)
chat = Chat(self, chat_id)
if contacts is not None:
for contact in contacts:
chat.add_contact(contact)
return chat
[docs]
def get_chats(self) -> List[Chat]:
"""return list of chats.
:returns: a list of :class:`deltachat.chat.Chat` objects.
"""
dc_chatlist = ffi.gc(lib.dc_get_chatlist(self._dc_context, 0, ffi.NULL, 0), lib.dc_chatlist_unref)
assert dc_chatlist != ffi.NULL
chatlist = []
for i in range(lib.dc_chatlist_get_cnt(dc_chatlist)):
chat_id = lib.dc_chatlist_get_chat_id(dc_chatlist, i)
chatlist.append(Chat(self, chat_id))
return chatlist
def get_device_chat(self) -> Chat:
return Contact(self, const.DC_CONTACT_ID_DEVICE).create_chat()
[docs]
def get_message_by_id(self, msg_id: int) -> Optional[Message]:
"""return Message instance.
:param msg_id: integer id of this message.
:returns: :class:`deltachat.message.Message` instance.
"""
return Message.from_db(self, msg_id)
[docs]
def get_chat_by_id(self, chat_id: int) -> Chat:
"""return Chat instance.
:param chat_id: integer id of this chat.
:returns: :class:`deltachat.chat.Chat` instance.
:raises: ValueError if chat does not exist.
"""
res = lib.dc_get_chat(self._dc_context, chat_id)
if res == ffi.NULL:
raise ValueError(f"cannot get chat with id={chat_id}")
lib.dc_chat_unref(res)
return Chat(self, chat_id)
[docs]
def mark_seen_messages(self, messages: List[Union[int, Message]]) -> None:
"""mark the given set of messages as seen.
:param messages: a list of message ids or Message instances.
"""
arr = array("i")
for msg in messages:
if isinstance(msg, Message):
arr.append(msg.id)
else:
arr.append(msg)
msg_ids = ffi.cast("uint32_t*", ffi.from_buffer(arr))
lib.dc_markseen_msgs(self._dc_context, msg_ids, len(messages))
[docs]
def forward_messages(self, messages: List[Message], chat: Chat) -> None:
"""Forward list of messages to a chat.
:param messages: list of :class:`deltachat.message.Message` object.
:param chat: :class:`deltachat.chat.Chat` object.
:returns: None
"""
msg_ids = [msg.id for msg in messages]
lib.dc_forward_msgs(self._dc_context, msg_ids, len(msg_ids), chat.id)
[docs]
def resend_messages(self, messages: List[Message]) -> None:
"""Resend list of messages.
:param messages: list of :class:`deltachat.message.Message` object.
:returns: None
"""
msg_ids = [msg.id for msg in messages]
if lib.dc_resend_msgs(self._dc_context, msg_ids, len(msg_ids)) != 1:
raise ValueError(f"could not resend messages {msg_ids}")
[docs]
def delete_messages(self, messages: List[Message]) -> None:
"""delete messages (local and remote).
:param messages: list of :class:`deltachat.message.Message` object.
:returns: None
"""
msg_ids = [msg.id for msg in messages]
lib.dc_delete_msgs(self._dc_context, msg_ids, len(msg_ids))
[docs]
def export_self_keys(self, path):
"""export public and private keys to the specified directory.
Note that the account does not have to be started.
"""
return self._export(path, imex_cmd=const.DC_IMEX_EXPORT_SELF_KEYS)
[docs]
def export_all(self, path: str, passphrase: Optional[str] = None) -> str:
"""Export a backup of all database state (chats, contacts, keys, media, ...).
:param path: the directory where the backup will be stored.
:param passphrase: the backup will be encrypted with the passphrase, if it is
None or empty string, the backup is not encrypted.
:returns: path to the created backup.
Note that the account has to be stopped; call stop_io() if necessary.
"""
export_files = self._export(path, const.DC_IMEX_EXPORT_BACKUP, passphrase)
if len(export_files) != 1:
raise RuntimeError("found more than one new file")
return export_files[0]
def _export(self, path: str, imex_cmd: int, passphrase: Optional[str] = None) -> list:
with self.temp_plugin(ImexTracker()) as imex_tracker:
self.imex(path, imex_cmd, passphrase)
return imex_tracker.wait_finish()
[docs]
def import_self_keys(self, path):
"""Import private keys found in the `path` directory.
The last imported key is made the default keys unless its name
contains the string legacy. Public keys are not imported.
Note that the account does not have to be started.
"""
self._import(path, imex_cmd=const.DC_IMEX_IMPORT_SELF_KEYS)
[docs]
def import_all(self, path: str, passphrase: Optional[str] = None) -> None:
"""Import Delta Chat state from the specified backup file.
The account must be in unconfigured state for import to attempted.
:param path: path to the backup file.
:param passphrase: if not None or empty, the backup will be opened with the passphrase.
"""
assert not self.is_configured(), "cannot import into configured account"
self._import(path, imex_cmd=const.DC_IMEX_IMPORT_BACKUP, passphrase=passphrase)
def _import(self, path: str, imex_cmd: int, passphrase: Optional[str] = None) -> None:
with self.temp_plugin(ImexTracker()) as imex_tracker:
self.imex(path, imex_cmd, passphrase)
imex_tracker.wait_finish()
def imex(self, path: str, imex_cmd: int, passphrase: Optional[str] = None) -> None:
lib.dc_imex(self._dc_context, imex_cmd, as_dc_charpointer(path), as_dc_charpointer(passphrase))
[docs]
def initiate_key_transfer(self) -> str:
"""return setup code after a Autocrypt setup message
has been successfully sent to our own e-mail address ("self-sent message").
If sending out was unsuccessful, a RuntimeError is raised.
"""
self.check_is_configured()
res = lib.dc_initiate_key_transfer(self._dc_context)
if res == ffi.NULL:
raise RuntimeError("could not send out autocrypt setup message")
return from_dc_charpointer(res)
[docs]
def check_qr(self, qr):
"""check qr code and return :class:`ScannedQRCode` instance representing the result."""
res = ffi.gc(lib.dc_check_qr(self._dc_context, as_dc_charpointer(qr)), lib.dc_lot_unref)
lot = DCLot(res)
if lot.state() == const.DC_QR_ERROR:
raise ValueError(f"invalid or unknown QR code: {lot.text1()}")
return ScannedQRCode(lot)
[docs]
def qr_join_chat(self, qr):
"""return a `Chat` instance for which the securejoin network
protocol has been started.
This function triggers a network protocol in the background between
the emitter of the QR code and this account.
:param qr: valid "join-group" QR code (all other QR codes will result in an exception)
"""
assert self.check_qr(qr).is_ask_verifygroup()
chat_id = lib.dc_join_securejoin(self._dc_context, as_dc_charpointer(qr))
if chat_id == 0:
raise ValueError("could not join group")
return Chat(self, chat_id)
[docs]
def set_location(self, latitude: float = 0.0, longitude: float = 0.0, accuracy: float = 0.0) -> None:
"""set a new location. It effects all chats where we currently
have enabled location streaming.
:param latitude: float (use 0.0 if not known)
:param longitude: float (use 0.0 if not known)
:param accuracy: float (use 0.0 if not known)
:raises: ValueError if no chat is currently streaming locations
:returns: None
"""
dc_res = lib.dc_set_location(self._dc_context, latitude, longitude, accuracy)
if dc_res == 0:
raise ValueError("no chat is streaming locations")
#
# meta API for start/stop and event based processing
#
[docs]
def run_account(self, addr=None, password=None, account_plugins=None, show_ffi=False, displayname=None):
"""get the account running, configure it if necessary. add plugins if provided.
:param addr: the email address of the account
:param password: the password of the account
:param account_plugins: a list of plugins to add
:param show_ffi: show low level ffi events
:param displayname: the display name of the account
"""
from .events import FFIEventLogger
if show_ffi:
log = FFIEventLogger(self)
self.add_account_plugin(log)
for plugin in account_plugins or []:
print("adding plugin", plugin)
self.add_account_plugin(plugin)
if not self.is_configured():
assert addr and password, "you must specify email and password once to configure this database/account"
self.set_config("addr", addr)
self.set_config("mail_pw", password)
self.set_config("bot", "1")
configtracker = self.configure()
configtracker.wait_finish()
if displayname:
self.set_config("displayname", displayname)
# start IO threads and configure if necessary
self.start_io()
[docs]
def add_account_plugin(self, plugin, name=None):
"""add an account plugin which implements one or more of
the :class:`deltachat.hookspec.PerAccount` hooks.
"""
if name and self._pm.has_plugin(name=name):
self._pm.unregister(name=name)
self._pm.register(plugin, name=name)
self._pm.check_pending()
return plugin
[docs]
def remove_account_plugin(self, plugin, name=None):
"""remove an account plugin."""
self._pm.unregister(plugin, name=name)
[docs]
@contextmanager
def temp_plugin(self, plugin):
"""run a with-block with the given plugin temporarily registered."""
self._pm.register(plugin)
yield plugin
self._pm.unregister(plugin)
[docs]
def stop_ongoing(self):
"""Stop ongoing securejoin, configuration or other core jobs."""
lib.dc_stop_ongoing_process(self._dc_context)
def get_connectivity(self):
return lib.dc_get_connectivity(self._dc_context)
def get_connectivity_html(self) -> str:
return from_dc_charpointer(lib.dc_get_connectivity_html(self._dc_context))
[docs]
def start_io(self):
"""start this account's IO scheduling (Rust-core async scheduler).
If this account is not configured an Exception is raised.
You need to call account.configure() and account.wait_configure_finish()
before.
You may call `stop_scheduler`, `wait_shutdown` or `shutdown` after the
account is started.
If you are using this from a test, you may want to call
wait_all_initial_fetches() afterwards.
:raises MissingCredentials: if `addr` and `mail_pw` values are not set.
:raises ConfigureFailed: if the account could not be configured.
:returns: None
"""
if not self.is_configured():
raise ValueError("account not configured, cannot start io")
lib.dc_start_io(self._dc_context)
[docs]
def maybe_network(self):
"""This function should be called when there is a hint
that the network is available again,
e.g. as a response to system event reporting network availability.
The library will try to send pending messages out immediately.
Moreover, to have a reliable state
when the app comes to foreground with network available,
it may be reasonable to call the function also at that moment.
It is okay to call the function unconditionally when there is
network available, however, calling the function
_without_ having network may interfere with the backoff algorithm
and will led to let the jobs fail faster, with fewer retries
and may avoid messages being sent out.
Finally, if the context was created by the dc_accounts_t account manager
(currently not implemented in the Python bindings),
use dc_accounts_maybe_network() instead of this function
"""
lib.dc_maybe_network(self._dc_context)
[docs]
def wait_shutdown(self) -> None:
"""wait until shutdown of this account has completed."""
self._shutdown_event.wait()
[docs]
def stop_io(self) -> None:
"""stop core IO scheduler if it is running."""
self.log("stop_ongoing")
self.stop_ongoing()
self.log("dc_stop_io (stop core IO scheduler)")
lib.dc_stop_io(self._dc_context)
[docs]
def shutdown(self) -> None:
"""shutdown and destroy account (stop callback thread, close and remove
underlying dc_context).
"""
if self._dc_context is None:
return
# mark the event thread for shutdown (latest on next incoming event)
self._event_thread.mark_shutdown()
# stop_io also causes an info event which will wake up
# the EventThread's inner loop and let it notice the shutdown marker.
self.stop_io()
self.log("wait for event thread to finish")
try:
self._event_thread.wait(timeout=5)
except RuntimeError as e:
self.log(f"Waiting for event thread failed: {e}")
if self._event_thread.is_alive():
self.log("WARN: event thread did not terminate yet, ignoring.")
self.log("remove dc_context references, making the Account unusable")
self._dc_context = None
self._shutdown_event.set()
hook = hookspec.Global._get_plugin_manager().hook
hook.dc_account_after_shutdown(account=self)
self.log("shutdown finished")
class ScannedQRCode:
def __init__(self, dc_lot) -> None:
self._dc_lot = dc_lot
def is_ask_verifycontact(self):
return self._dc_lot.state() == const.DC_QR_ASK_VERIFYCONTACT
def is_ask_verifygroup(self):
return self._dc_lot.state() == const.DC_QR_ASK_VERIFYGROUP
@property
def contact_id(self):
return self._dc_lot.id()