Source code for deltachat.message

""" The Message object. """

import os
import re
from . import props
from .cutil import from_dc_charpointer, as_dc_charpointer
from .capi import lib, ffi
from . import const
from datetime import datetime


[docs]class Message(object): """ Message object. You obtain instances of it through :class:`deltachat.account.Account` or :class:`deltachat.chat.Chat`. """ def __init__(self, account, dc_msg): self.account = account assert isinstance(self.account._dc_context, ffi.CData) assert isinstance(dc_msg, ffi.CData) assert dc_msg != ffi.NULL self._dc_msg = dc_msg self.id = lib.dc_msg_get_id(dc_msg) assert self.id is not None and self.id >= 0, repr(self.id) def __eq__(self, other): return self.account == other.account and self.id == other.id def __repr__(self): c = self.get_sender_contact() typ = "outgoing" if self.is_outgoing() else "incoming" return "<Message {} sys={} {} id={} sender={}/{} chat={}/{}>".format( typ, self.is_system_message(), repr(self.text[:10]), self.id, c.id, c.addr, self.chat.id, self.chat.get_name()) @classmethod def from_db(cls, account, id): assert id > 0 return cls(account, ffi.gc( lib.dc_get_msg(account._dc_context, id), lib.dc_msg_unref ))
[docs] @classmethod def new_empty(cls, account, view_type): """ create a non-persistent message. :param: view_type is "text", "audio", "video", "file" """ view_type_code = get_viewtype_code_from_name(view_type) return Message(account, ffi.gc( lib.dc_msg_new(account._dc_context, view_type_code), lib.dc_msg_unref ))
[docs] def create_chat(self): """ create or get an existing chat (group) object for this message. If the message is a deaddrop contact request the sender will become an accepted contact. :returns: a :class:`deltachat.chat.Chat` object. """ from .chat import Chat chat_id = lib.dc_create_chat_by_msg_id(self.account._dc_context, self.id) ctx = self.account._dc_context self._dc_msg = ffi.gc(lib.dc_get_msg(ctx, self.id), lib.dc_msg_unref) return Chat(self.account, chat_id)
@props.with_doc def text(self): """unicode text of this messages (might be empty if not a text message). """ return from_dc_charpointer(lib.dc_msg_get_text(self._dc_msg))
[docs] def set_text(self, text): """set text of this message. """ lib.dc_msg_set_text(self._dc_msg, as_dc_charpointer(text))
@props.with_doc def filename(self): """filename if there was an attachment, otherwise empty string. """ return from_dc_charpointer(lib.dc_msg_get_file(self._dc_msg))
[docs] def set_file(self, path, mime_type=None): """set file for this message from path and mime_type. """ mtype = ffi.NULL if mime_type is None else as_dc_charpointer(mime_type) if not os.path.exists(path): raise ValueError("path does not exist: {!r}".format(path)) lib.dc_msg_set_file(self._dc_msg, as_dc_charpointer(path), mtype)
@props.with_doc def basename(self): """basename of the attachment if it exists, otherwise empty string. """ return from_dc_charpointer(lib.dc_msg_get_filename(self._dc_msg)) @props.with_doc def filemime(self): """mime type of the file (if it exists)""" return from_dc_charpointer(lib.dc_msg_get_filemime(self._dc_msg))
[docs] def is_system_message(self): """ return True if this message is a system/info message. """ return bool(lib.dc_msg_is_info(self._dc_msg))
[docs] def is_setup_message(self): """ return True if this message is a setup message. """ return lib.dc_msg_is_setupmessage(self._dc_msg)
[docs] def get_setupcodebegin(self): """ return the first characters of a setup code in a setup message. """ return from_dc_charpointer(lib.dc_msg_get_setupcodebegin(self._dc_msg))
[docs] def is_encrypted(self): """ return True if this message was encrypted. """ return bool(lib.dc_msg_get_showpadlock(self._dc_msg))
[docs] def is_forwarded(self): """ return True if this message was forwarded. """ return bool(lib.dc_msg_is_forwarded(self._dc_msg))
[docs] def get_message_info(self): """ Return informational text for a single message. The text is multiline and may contain eg. the raw text of the message. """ return from_dc_charpointer(lib.dc_get_msg_info(self.account._dc_context, self.id))
[docs] def continue_key_transfer(self, setup_code): """ extract key and use it as primary key for this account. """ res = lib.dc_continue_key_transfer( self.account._dc_context, self.id, as_dc_charpointer(setup_code) ) if res == 0: raise ValueError("could not decrypt")
@props.with_doc def time_sent(self): """UTC time when the message was sent. :returns: naive datetime.datetime() object. """ ts = lib.dc_msg_get_timestamp(self._dc_msg) return datetime.utcfromtimestamp(ts) @props.with_doc def time_received(self): """UTC time when the message was received. :returns: naive datetime.datetime() object or None if message is an outgoing one. """ ts = lib.dc_msg_get_received_timestamp(self._dc_msg) if ts: return datetime.utcfromtimestamp(ts) @props.with_doc def ephemeral_timer(self): """Ephemeral timer in seconds :returns: timer in seconds or None if there is no timer """ timer = lib.dc_msg_get_ephemeral_timer(self._dc_msg) if timer: return timer @props.with_doc def ephemeral_timestamp(self): """UTC time when the message will be deleted. :returns: naive datetime.datetime() object or None if the timer is not started. """ ts = lib.dc_msg_get_ephemeral_timestamp(self._dc_msg) if ts: return datetime.utcfromtimestamp(ts)
[docs] def get_mime_headers(self): """ return mime-header object for an incoming message. This only returns a non-None object if ``save_mime_headers`` config option was set and the message is incoming. :returns: email-mime message object (with headers only, no body). """ import email.parser mime_headers = lib.dc_get_mime_headers(self.account._dc_context, self.id) if mime_headers: s = ffi.string(ffi.gc(mime_headers, lib.dc_str_unref)) if isinstance(s, bytes): return email.message_from_bytes(s) return email.message_from_string(s)
@property def chat(self): """chat this message was posted in. :returns: :class:`deltachat.chat.Chat` object """ from .chat import Chat chat_id = lib.dc_msg_get_chat_id(self._dc_msg) return Chat(self.account, chat_id)
[docs] def get_sender_chat(self): """return the 1:1 chat with the sender of this message. :returns: :class:`deltachat.chat.Chat` instance """ return self.get_sender_contact().get_chat()
[docs] def get_sender_contact(self): """return the contact of who wrote the message. :returns: :class:`deltachat.chat.Contact` instance """ from .contact import Contact contact_id = lib.dc_msg_get_from_id(self._dc_msg) return Contact(self.account, contact_id)
# # Message State query methods # @property def _msgstate(self): if self.id == 0: dc_msg = self._dc_msg else: # load message from db to get a fresh/current state dc_msg = ffi.gc( lib.dc_get_msg(self.account._dc_context, self.id), lib.dc_msg_unref ) return lib.dc_msg_get_state(dc_msg)
[docs] def is_in_fresh(self): """ return True if Message is incoming fresh message (un-noticed). Fresh messages are not noticed nor seen and are typically shown in notifications. """ return self._msgstate == const.DC_STATE_IN_FRESH
[docs] def is_in_noticed(self): """Return True if Message is incoming and noticed. Eg. chat opened but message not yet read - noticed messages are not counted as unread but were not marked as read nor resulted in MDNs. """ return self._msgstate == const.DC_STATE_IN_NOTICED
[docs] def is_in_seen(self): """Return True if Message is incoming, noticed and has been seen. Eg. chat opened but message not yet read - noticed messages are not counted as unread but were not marked as read nor resulted in MDNs. """ return self._msgstate == const.DC_STATE_IN_SEEN
[docs] def is_outgoing(self): """Return True if Message is outgoing. """ return self._msgstate in ( const.DC_STATE_OUT_PREPARING, const.DC_STATE_OUT_PENDING, const.DC_STATE_OUT_FAILED, const.DC_STATE_OUT_MDN_RCVD, const.DC_STATE_OUT_DELIVERED)
[docs] def is_out_preparing(self): """Return True if Message is outgoing, but its file is being prepared. """ return self._msgstate == const.DC_STATE_OUT_PREPARING
[docs] def is_out_pending(self): """Return True if Message is outgoing, but is pending (no single checkmark). """ return self._msgstate == const.DC_STATE_OUT_PENDING
[docs] def is_out_failed(self): """Return True if Message is unrecoverably failed. """ return self._msgstate == const.DC_STATE_OUT_FAILED
[docs] def is_out_delivered(self): """Return True if Message was successfully delivered to the server (one checkmark). Note, that already delivered messages may get into the state is_out_failed(). """ return self._msgstate == const.DC_STATE_OUT_DELIVERED
[docs] def is_out_mdn_received(self): """Return True if message was marked as read by the recipient(s) (two checkmarks; this requires goodwill on the receiver's side). If a sent message changes to this state, you'll receive the event DC_EVENT_MSG_READ. """ return self._msgstate == const.DC_STATE_OUT_MDN_RCVD
# # Message type query methods # @property def _view_type(self): assert self.id > 0 return lib.dc_msg_get_viewtype(self._dc_msg)
[docs] def is_text(self): """ return True if it's a text message. """ return self._view_type == const.DC_MSG_TEXT
[docs] def is_image(self): """ return True if it's an image message. """ return self._view_type == const.DC_MSG_IMAGE
[docs] def is_gif(self): """ return True if it's a gif message. """ return self._view_type == const.DC_MSG_GIF
[docs] def is_audio(self): """ return True if it's an audio message. """ return self._view_type == const.DC_MSG_AUDIO
[docs] def is_video(self): """ return True if it's a video message. """ return self._view_type == const.DC_MSG_VIDEO
[docs] def is_file(self): """ return True if it's a file message. """ return self._view_type == const.DC_MSG_FILE
[docs] def mark_seen(self): """ mark this message as seen. """ self.account.mark_seen_messages([self.id])
# some code for handling DC_MSG_* view types _view_type_mapping = { const.DC_MSG_TEXT: 'text', const.DC_MSG_IMAGE: 'image', const.DC_MSG_GIF: 'gif', const.DC_MSG_AUDIO: 'audio', const.DC_MSG_VIDEO: 'video', const.DC_MSG_FILE: 'file' } def get_viewtype_code_from_name(view_type_name): for code, value in _view_type_mapping.items(): if value == view_type_name: return code raise ValueError("message typecode not found for {!r}, " "available {!r}".format(view_type_name, list(_view_type_mapping.values()))) # # some helper code for turning system messages into hook events # def map_system_message(msg): if msg.is_system_message(): res = parse_system_add_remove(msg.text) if not res: return action, affected, actor = res affected = msg.account.get_contact_by_addr(affected) if actor == "me": actor = None else: actor = msg.account.get_contact_by_addr(actor) d = dict(chat=msg.chat, contact=affected, actor=actor, message=msg) return "ac_member_" + res[0], d def extract_addr(text): m = re.match(r'.*\((.+@.+)\)', text) if m: text = m.group(1) text = text.rstrip(".") return text.strip() def parse_system_add_remove(text): """ return add/remove info from parsing the given system message text. returns a (action, affected, actor) triple """ # Member Me (x@y) removed by a@b. # Member x@y added by a@b # Member With space (tmp1@x.org) removed by tmp2@x.org. # Member With space (tmp1@x.org) removed by Another member (tmp2@x.org).", # Group left by some one (tmp1@x.org). # Group left by tmp1@x.org. text = text.lower() m = re.match(r'member (.+) (removed|added) by (.+)', text) if m: affected, action, actor = m.groups() return action, extract_addr(affected), extract_addr(actor) if text.startswith("group left by "): addr = extract_addr(text[13:]) if addr: return "removed", addr, addr