"""The Message object."""
import json
import os
import re
from datetime import datetime, timezone
from typing import Optional, Union
from . import const, props
from .capi import ffi, lib
from .cutil import as_dc_charpointer, from_dc_charpointer, from_optional_dc_charpointer
from .reactions import Reactions
[docs]
class Message:
"""Message object.
You obtain instances of it through :class:`deltachat.account.Account` or
:class:`deltachat.chat.Chat`.
"""
def __init__(self, account, dc_msg) -> None:
self.account = account
assert isinstance(self.account._dc_context, ffi.CData)
assert isinstance(dc_msg, ffi.CData)
assert dc_msg != ffi.NULL
self._dc_msg = dc_msg
msg_id = self.id
assert msg_id is not None and msg_id >= 0, repr(msg_id)
def __eq__(self, other) -> bool:
if other is None:
return False
return self.account == other.account and self.id == other.id
def __repr__(self) -> str:
c = self.get_sender_contact()
typ = "outgoing" if self.is_outgoing() else "incoming"
return (
f"<Message {typ} sys={self.is_system_message()} {repr(self.text[:100])} "
f"id={self.id} sender={c.id}/{c.addr} chat={self.chat.id}/{self.chat.get_name()}>"
)
[docs]
@classmethod
def from_db(cls, account, id) -> Optional["Message"]:
"""Attempt to load the message from the database given its ID.
None is returned if the message does not exist, i.e. deleted."""
assert id > 0
res = lib.dc_get_msg(account._dc_context, id)
if res == ffi.NULL:
return None
return cls(account, ffi.gc(res, lib.dc_msg_unref))
[docs]
@classmethod
def new_empty(cls, account, view_type):
"""create a non-persistent message.
:param view_type: the message type code or one of the strings:
"text", "audio", "video", "file", "sticker", "videochat", "webxdc"
"""
view_type_code = view_type if isinstance(view_type, int) else get_viewtype_code_from_name(view_type)
return Message(
account,
ffi.gc(lib.dc_msg_new(account._dc_context, view_type_code), lib.dc_msg_unref),
)
[docs]
def create_chat(self):
"""create or get an existing chat (group) object for this message.
If the message is a contact request
the sender will become an accepted contact.
:returns: a :class:`deltachat.chat.Chat` object.
"""
self.chat.accept()
return self.chat
@props.with_doc
def id(self):
"""id of this message."""
return lib.dc_msg_get_id(self._dc_msg)
@props.with_doc
def text(self) -> str:
"""unicode text of this messages (might be empty if not a text message)."""
return from_dc_charpointer(lib.dc_msg_get_text(self._dc_msg))
[docs]
def set_text(self, text):
"""set text of this message."""
lib.dc_msg_set_text(self._dc_msg, as_dc_charpointer(text))
@props.with_doc
def html(self) -> str:
"""html text of this messages (might be empty if not an html message)."""
return from_optional_dc_charpointer(lib.dc_get_msg_html(self.account._dc_context, self.id)) or ""
[docs]
def has_html(self):
"""return True if this message has an html part, False otherwise."""
return lib.dc_msg_has_html(self._dc_msg)
[docs]
def set_html(self, html_text):
"""set the html part of this message.
It is possible to have text and html part at the same time.
"""
lib.dc_msg_set_html(self._dc_msg, as_dc_charpointer(html_text))
@props.with_doc
def filename(self):
"""file path if there was an attachment, otherwise empty string.
If you want to get the file extension or a user-visible string,
use `basename` instead."""
return from_dc_charpointer(lib.dc_msg_get_file(self._dc_msg))
[docs]
def set_file(self, path, mime_type=None):
"""set file for this message from path and mime_type."""
mtype = ffi.NULL if mime_type is None else as_dc_charpointer(mime_type)
if not os.path.exists(path):
raise ValueError(f"path does not exist: {path!r}")
lib.dc_msg_set_file(self._dc_msg, as_dc_charpointer(path), mtype)
@props.with_doc
def basename(self) -> str:
"""The user-visible name of the attachment (incl. extension)
if it exists, otherwise empty string."""
# FIXME, it does not return basename
return from_dc_charpointer(lib.dc_msg_get_filename(self._dc_msg))
@props.with_doc
def filemime(self) -> str:
"""mime type of the file (if it exists)."""
return from_dc_charpointer(lib.dc_msg_get_filemime(self._dc_msg))
[docs]
def get_status_updates(self, serial: int = 0) -> list:
"""Get the status updates of this webxdc message.
The status updates may be sent by yourself or by other members.
If this message doesn't have a webxdc instance, an empty list is returned.
:param serial: The last known serial. Pass 0 if there are no known serials to receive all updates.
"""
return json.loads(
from_dc_charpointer(lib.dc_get_webxdc_status_updates(self.account._dc_context, self.id, serial)),
)
[docs]
def send_status_update(self, json_data: Union[str, dict], description: str) -> bool:
"""Send an status update for the webxdc instance of this message.
If the webxdc instance is a draft, the update is not sent immediately.
Instead, the updates are collected and sent out in a batch when the instance is actually sent.
:param json_data: program-readable data, the actual payload.
:param description: The user-visible description of JSON data
:returns: True on success, False otherwise
"""
if isinstance(json_data, dict):
json_data = json.dumps(json_data, default=str)
return bool(
lib.dc_send_webxdc_status_update(
self.account._dc_context,
self.id,
as_dc_charpointer(json_data),
as_dc_charpointer(description),
),
)
[docs]
def send_reaction(self, reaction: str):
"""Send a reaction to message and return the resulting Message instance."""
msg_id = lib.dc_send_reaction(self.account._dc_context, self.id, as_dc_charpointer(reaction))
if msg_id == 0:
raise ValueError("reaction could not be send")
return Message.from_db(self.account, msg_id)
[docs]
def get_reactions(self) -> Reactions:
"""Get :class:`deltachat.reactions.Reactions` to the message."""
return Reactions.from_msg(self)
[docs]
def is_system_message(self):
"""return True if this message is a system/info message."""
return bool(lib.dc_msg_is_info(self._dc_msg))
[docs]
def is_setup_message(self):
"""return True if this message is a setup message."""
return lib.dc_msg_is_setupmessage(self._dc_msg)
[docs]
def get_setupcodebegin(self) -> str:
"""return the first characters of a setup code in a setup message."""
return from_dc_charpointer(lib.dc_msg_get_setupcodebegin(self._dc_msg))
[docs]
def is_encrypted(self):
"""return True if this message was encrypted."""
return bool(lib.dc_msg_get_showpadlock(self._dc_msg))
[docs]
def is_bot(self):
"""return True if this message is submitted automatically."""
return bool(lib.dc_msg_is_bot(self._dc_msg))
[docs]
def is_forwarded(self):
"""return True if this message was forwarded."""
return bool(lib.dc_msg_is_forwarded(self._dc_msg))
[docs]
def get_message_info(self) -> str:
"""Return informational text for a single message.
The text is multiline and may contain eg. the raw text of the message.
"""
return from_dc_charpointer(lib.dc_get_msg_info(self.account._dc_context, self.id))
[docs]
def get_summarytext(self, width: int) -> str:
"""Get a message summary as a single line of text. Typically used for notifications."""
return from_dc_charpointer(lib.dc_msg_get_summarytext(self._dc_msg, width))
[docs]
def continue_key_transfer(self, setup_code):
"""extract key and use it as primary key for this account."""
res = lib.dc_continue_key_transfer(self.account._dc_context, self.id, as_dc_charpointer(setup_code))
if res == 0:
raise ValueError("could not decrypt")
@props.with_doc
def time_sent(self):
"""UTC time when the message was sent.
:returns: naive datetime.datetime() object.
"""
ts = lib.dc_msg_get_timestamp(self._dc_msg)
return datetime.fromtimestamp(ts, timezone.utc)
@props.with_doc
def time_received(self):
"""UTC time when the message was received.
:returns: naive datetime.datetime() object or None if message is an outgoing one.
"""
ts = lib.dc_msg_get_received_timestamp(self._dc_msg)
if ts:
return datetime.fromtimestamp(ts, timezone.utc)
return None
@props.with_doc
def ephemeral_timer(self):
"""Ephemeral timer in seconds.
:returns: timer in seconds or None if there is no timer
"""
timer = lib.dc_msg_get_ephemeral_timer(self._dc_msg)
if timer:
return timer
return None
@props.with_doc
def ephemeral_timestamp(self):
"""UTC time when the message will be deleted.
:returns: naive datetime.datetime() object or None if the timer is not started.
"""
ts = lib.dc_msg_get_ephemeral_timestamp(self._dc_msg)
if ts:
return datetime.fromtimestamp(ts, timezone.utc)
@property
def quoted_text(self) -> Optional[str]:
"""Text inside the quote.
:returns: Quoted text
"""
return from_optional_dc_charpointer(lib.dc_msg_get_quoted_text(self._dc_msg))
@property
def quote(self):
"""Quote getter.
:returns: Quoted message, if found in the database
"""
msg = lib.dc_msg_get_quoted_msg(self._dc_msg)
if msg:
return Message(self.account, ffi.gc(msg, lib.dc_msg_unref))
@quote.setter
def quote(self, quoted_message):
"""Quote setter."""
lib.dc_msg_set_quote(self._dc_msg, quoted_message._dc_msg)
[docs]
def force_plaintext(self) -> None:
"""Force the message to be sent in plain text."""
lib.dc_msg_force_plaintext(self._dc_msg)
@property
def error(self) -> Optional[str]:
"""Error message."""
return from_optional_dc_charpointer(lib.dc_msg_get_error(self._dc_msg))
@property
def chat(self):
"""chat this message was posted in.
:returns: :class:`deltachat.chat.Chat` object
"""
from .chat import Chat
chat_id = lib.dc_msg_get_chat_id(self._dc_msg)
return Chat(self.account, chat_id)
@props.with_doc
def override_sender_name(self) -> Optional[str]:
"""the name that should be shown over the message instead of the contact display name.
Usually used to impersonate someone else.
"""
return from_optional_dc_charpointer(lib.dc_msg_get_override_sender_name(self._dc_msg))
[docs]
def set_override_sender_name(self, name):
"""set different sender name for a message."""
lib.dc_msg_set_override_sender_name(self._dc_msg, as_dc_charpointer(name))
[docs]
def get_sender_chat(self):
"""return the 1:1 chat with the sender of this message.
:returns: :class:`deltachat.chat.Chat` instance
"""
return self.get_sender_contact().get_chat()
[docs]
def is_from_self(self):
"""Return true if the message is sent by self."""
contact_id = lib.dc_msg_get_from_id(self._dc_msg)
return contact_id == const.DC_CONTACT_ID_SELF
[docs]
def is_from_device(self):
"""Return true if the message is sent by the device."""
contact_id = lib.dc_msg_get_from_id(self._dc_msg)
return contact_id == const.DC_CONTACT_ID_DEVICE
#
# Message State query methods
#
@property
def _msgstate(self):
if self.id == 0:
dc_msg = self._dc_msg
else:
# load message from db to get a fresh/current state
dc_msg = ffi.gc(lib.dc_get_msg(self.account._dc_context, self.id), lib.dc_msg_unref)
# Message could be trashed, use the cached object if so.
if dc_msg == ffi.NULL:
dc_msg = self._dc_msg
return lib.dc_msg_get_state(dc_msg)
[docs]
def is_in_fresh(self):
"""return True if Message is incoming fresh message (un-noticed).
Fresh messages are not noticed nor seen and are typically
shown in notifications.
"""
return self._msgstate == const.DC_STATE_IN_FRESH
[docs]
def is_in_noticed(self):
"""Return True if Message is incoming and noticed.
Eg. chat opened but message not yet read - noticed messages
are not counted as unread but were not marked as read nor resulted in MDNs.
"""
return self._msgstate == const.DC_STATE_IN_NOTICED
[docs]
def is_in_seen(self):
"""Return True if Message is incoming, noticed and has been seen.
Eg. chat opened but message not yet read - noticed messages
are not counted as unread but were not marked as read nor resulted in MDNs.
"""
return self._msgstate == const.DC_STATE_IN_SEEN
[docs]
def is_outgoing(self):
"""Return True if Message is outgoing."""
return lib.dc_msg_get_state(self._dc_msg) in (
const.DC_STATE_OUT_PREPARING,
const.DC_STATE_OUT_PENDING,
const.DC_STATE_OUT_FAILED,
const.DC_STATE_OUT_MDN_RCVD,
const.DC_STATE_OUT_DELIVERED,
)
[docs]
def is_out_preparing(self):
"""Return True if Message is outgoing, but its file is being prepared."""
return self._msgstate == const.DC_STATE_OUT_PREPARING
[docs]
def is_out_pending(self):
"""Return True if Message is outgoing, but is pending (no single checkmark)."""
return self._msgstate == const.DC_STATE_OUT_PENDING
[docs]
def is_out_failed(self):
"""Return True if Message is unrecoverably failed."""
return self._msgstate == const.DC_STATE_OUT_FAILED
[docs]
def is_out_delivered(self):
"""Return True if Message was successfully delivered to the server (one checkmark).
Note, that already delivered messages may get into the state is_out_failed().
"""
return self._msgstate == const.DC_STATE_OUT_DELIVERED
[docs]
def is_out_mdn_received(self):
"""Return True if message was marked as read by the recipient(s) (two checkmarks;
this requires goodwill on the receiver's side). If a sent message changes to this
state, you'll receive the event DC_EVENT_MSG_READ.
"""
return self._msgstate == const.DC_STATE_OUT_MDN_RCVD
#
# Message type query methods
#
@property
def _view_type(self):
assert self.id > 0
return lib.dc_msg_get_viewtype(self._dc_msg)
[docs]
def is_text(self):
"""return True if it's a text message."""
return self._view_type == const.DC_MSG_TEXT
[docs]
def is_image(self):
"""return True if it's an image message."""
return self._view_type == const.DC_MSG_IMAGE
[docs]
def is_gif(self):
"""return True if it's a gif message."""
return self._view_type == const.DC_MSG_GIF
[docs]
def is_sticker(self):
"""return True if it's a sticker message."""
return self._view_type == const.DC_MSG_STICKER
[docs]
def is_audio(self):
"""return True if it's an audio message."""
return self._view_type == const.DC_MSG_AUDIO
[docs]
def is_video(self):
"""return True if it's a video message."""
return self._view_type == const.DC_MSG_VIDEO
[docs]
def is_videochat_invitation(self):
"""return True if it's a videochat invitation message."""
return self._view_type == const.DC_MSG_VIDEOCHAT_INVITATION
[docs]
def is_webxdc(self):
"""return True if it's a Webxdc message."""
return self._view_type == const.DC_MSG_WEBXDC
[docs]
def is_file(self):
"""return True if it's a file message."""
return self._view_type == const.DC_MSG_FILE
[docs]
def mark_seen(self):
"""mark this message as seen."""
self.account.mark_seen_messages([self.id])
#
# Message download state
#
@property
def download_state(self):
assert self.id > 0
# load message from db to get a fresh/current state
dc_msg = ffi.gc(lib.dc_get_msg(self.account._dc_context, self.id), lib.dc_msg_unref)
# Message could be trashed, use the cached object if so.
if dc_msg == ffi.NULL:
dc_msg = self._dc_msg
return lib.dc_msg_get_download_state(dc_msg)
def download_full(self) -> None:
lib.dc_download_full_msg(self.account._dc_context, self.id)
# some code for handling DC_MSG_* view types
_view_type_mapping = {
"text": const.DC_MSG_TEXT,
"image": const.DC_MSG_IMAGE,
"gif": const.DC_MSG_GIF,
"audio": const.DC_MSG_AUDIO,
"video": const.DC_MSG_VIDEO,
"file": const.DC_MSG_FILE,
"sticker": const.DC_MSG_STICKER,
"videochat": const.DC_MSG_VIDEOCHAT_INVITATION,
"webxdc": const.DC_MSG_WEBXDC,
}
def get_viewtype_code_from_name(view_type_name):
code = _view_type_mapping.get(view_type_name)
if code is not None:
return code
raise ValueError(
f"message typecode not found for {view_type_name!r}, available {list(_view_type_mapping.keys())!r}",
)
#
# some helper code for turning system messages into hook events
#
def map_system_message(msg):
if msg.is_system_message():
res = parse_system_add_remove(msg.text)
if not res:
return None
action, affected, actor = res
affected = msg.account.get_contact_by_addr(affected)
actor = None if actor == "me" else msg.account.get_contact_by_addr(actor)
d = {"chat": msg.chat, "contact": affected, "actor": actor, "message": msg}
return "ac_member_" + res[0], d
def extract_addr(text):
m = re.match(r".*\((.+@.+)\)", text)
if m:
text = m.group(1)
text = text.rstrip(".")
return text.strip()
def parse_system_add_remove(text):
"""return add/remove info from parsing the given system message text.
returns a (action, affected, actor) triple
"""
# You removed member a@b.
# You added member a@b.
# Member Me (x@y) removed by a@b.
# Member x@y added by a@b
# Member With space (tmp1@x.org) removed by tmp2@x.org.
# Member With space (tmp1@x.org) removed by Another member (tmp2@x.org).",
# Group left by some one (tmp1@x.org).
# Group left by tmp1@x.org.
text = text.lower()
m = re.match(r"member (.+) (removed|added) by (.+)", text)
if m:
affected, action, actor = m.groups()
return action, extract_addr(affected), extract_addr(actor)
m = re.match(r"you (removed|added) member (.+)", text)
if m:
action, affected = m.groups()
return action, extract_addr(affected), "me"
if text.startswith("group left by "):
addr = extract_addr(text[13:])
if addr:
return "removed", addr, addr