Source code for instagram_private_api.endpoints.media

import json
import re
import warnings
import time
from random import randint

from .common import ClientExperimentalWarning, MediaTypes
from ..utils import gen_user_breadcrumb
from ..compatpatch import ClientCompatPatch


class MediaEndpointsMixin(object):
    """For endpoints in ``/media/``."""

    def media_info(self, media_id):
        """
        Get media info

        :param media_id:
        :return:
        """
        endpoint = 'media/{media_id!s}/info/'.format(**{'media_id': media_id})
        res = self._call_api(endpoint)
        if self.auto_patch:
            [ClientCompatPatch.media(m, drop_incompat_keys=self.drop_incompat_keys)
             for m in res.get('items', [])]
        return res

    def medias_info(self, media_ids):
        """
        Get multiple media infos

        :param media_ids: list of media ids
        :return:
        """

        if isinstance(media_ids, str):
            media_ids = [media_ids]

        params = {
            '_uuid': self.uuid,
            '_csrftoken': self.csrftoken,
            'media_ids': ','.join(media_ids),
            'ranked_content': 'true',
            'include_inactive_reel': 'true',
        }
        res = self._call_api('media/infos/', query=params)
        if self.auto_patch:
            [ClientCompatPatch.media(m, drop_incompat_keys=self.drop_incompat_keys)
             for m in res.get('items', [])]
        return res

    def media_permalink(self, media_id):
        """
        Get media permalink

        :param media_id:
        :return:
        """
        endpoint = 'media/{media_id!s}/permalink/'.format(**{'media_id': media_id})
        res = self._call_api(endpoint)
        return res

    def media_comments(self, media_id, **kwargs):
        """
        Get media comments. Fixed at 20 comments returned per page.

        :param media_id: Media id
        :param kwargs:
            **max_id**: For pagination
        :return:
        """
        endpoint = 'media/{media_id!s}/comments/'.format(**{'media_id': media_id})
        query = {
            'can_support_threading': 'true'
        }
        if kwargs:
            query.update(kwargs)
        res = self._call_api(endpoint, query=query)

        if self.auto_patch:
            [ClientCompatPatch.comment(c, drop_incompat_keys=self.drop_incompat_keys)
             for c in res.get('comments', [])]
            [ClientCompatPatch.comment(c, drop_incompat_keys=self.drop_incompat_keys)
             for c in res.get('preview_comments', [])]
        return res

    def media_n_comments(self, media_id, n=150, reverse=False, **kwargs):
        """
        Helper method to retrieve n number of comments for a media id

        :param media_id: Media id
        :param n: Minimum number of comments to fetch
        :param reverse: Reverse list of comments (ordered by created_time)
        :param kwargs:
        :return:
        """

        endpoint = 'media/{media_id!s}/comments/'.format(**{'media_id': media_id})

        comments = []
        results = self._call_api(endpoint, query=kwargs)
        comments.extend(results.get('comments', []))

        while (((results.get('has_more_comments') and results.get('next_max_id'))
                or (results.get('has_more_headload_comments') and results.get('next_min_id')))
                and len(comments) < n):

            if results.get('has_more_comments'):
                kwargs.update({'max_id': results.get('next_max_id')})
            else:
                kwargs.update({'min_id': results.get('next_min_id')})

            results = self._call_api(endpoint, query=kwargs)
            comments.extend(results.get('comments', []))
            if not (results.get('next_max_id') or results.get('next_min_id') or results.get('comments')):
                # bail out if no max_id/min_id or comments returned
                break

        if self.auto_patch:
            [ClientCompatPatch.comment(c, drop_incompat_keys=self.drop_incompat_keys)
             for c in comments]

        return sorted(comments, key=lambda k: k['created_at_utc'], reverse=reverse)

    def comment_replies(self, media_id, comment_id, **kwargs):
        """
        Get comment replies. Fixed at 20 replies returned per page.
        Check for 'has_more_tail_child_comments', 'next_max_child_cursor' to determine
        if there are more replies to page through.

        :param media_id: Media id
        :param comment_id: Comment id
        :param kwargs:
            **max_id**: For pagination
        :return:
        """
        endpoint = 'media/{media_id!s}/comments/{comment_id!s}/child_comments/'.format(
            **{'media_id': media_id, 'comment_id': comment_id})
        res = self._call_api(endpoint, query=kwargs)

        if self.auto_patch:
            [ClientCompatPatch.comment(c, drop_incompat_keys=self.drop_incompat_keys)
             for c in res.get('child_comments', [])]
            ClientCompatPatch.comment(res.get('parent_comment'))
        return res

    def comment_inline_replies(self, media_id, comment_id, max_id, **kwargs):
        """
        Get inline comment replies.
        Check for 'next_max_child_cursor' from ``media_comments()``
        to determine if there are inline comment replies to retrieve.

        :param media_id: Media id
        :param comment_id: Comment id
        :param max_id: The comment's 'next_max_child_cursor' value from``media_comments()``
        :return:
        """
        endpoint = 'media/{media_id!s}/comments/{comment_id!s}/inline_child_comments/'.format(
            **{'media_id': media_id, 'comment_id': comment_id})
        query = {'max_id': max_id}
        if kwargs:
            query.update(kwargs)
        res = self._call_api(endpoint, query=query)
        if self.auto_patch:
            [ClientCompatPatch.comment(c, drop_incompat_keys=self.drop_incompat_keys)
             for c in res.get('child_comments', [])]
            ClientCompatPatch.comment(res.get('parent_comment'))
        return res

    def edit_media(self, media_id, caption, usertags=None):
        """
        Edit a media's caption

        :param media_id: Media id
        :param caption: Caption text
        :param usertags: array of user_ids and positions in the format below:

            .. code-block:: javascript

                usertags = [
                    {"user_id":4292127751, "position":[0.625347,0.4384531]}
                ]
        :return:
        """
        if usertags is None:
            usertags = []
        endpoint = 'media/{media_id!s}/edit_media/'.format(**{'media_id': media_id})
        params = {'caption_text': caption}
        params.update(self.authenticated_params)
        if usertags:
            utags = {'in': [{'user_id': u['user_id'], 'position': u['position']} for u in usertags]}
            params['usertags'] = json.dumps(utags, separators=(',', ':'))
        res = self._call_api(endpoint, params=params)
        if self.auto_patch:
            ClientCompatPatch.media(res.get('media'))
        return res

    def delete_media(self, media_id):
        """
        Delete a media

        :param media_id: Media id
        :return:
            .. code-block:: javascript

                {"status": "ok", "did_delete": true}
        """
        endpoint = 'media/{media_id!s}/delete/'.format(**{'media_id': media_id})
        params = {'media_id': media_id}
        params.update(self.authenticated_params)
        return self._call_api(endpoint, params=params)

    def post_comment(self, media_id, comment_text):
        """
        Post a comment.
        Comment text validation according to https://www.instagram.com/developer/endpoints/comments/#post_media_comments

        :param media_id: Media id
        :param comment_text: Comment text
        :return:
            .. code-block:: javascript

                {
                  "comment": {
                    "status": "Active",
                    "media_id": 123456789,
                    "text": ":)",
                    "created_at": 1479453671.0,
                    "user": {
                      "username": "x",
                      "has_anonymous_profile_picture": false,
                      "profile_pic_url": "http://scontent-sit4-1.cdninstagram.com/abc.jpg",
                      "full_name": "x",
                      "pk": 123456789,
                      "is_verified": false,
                      "is_private": false
                    },
                    "content_type": "comment",
                    "created_at_utc": 1479482471,
                    "pk": 17865505612040669,
                    "type": 0
                  },
                  "status": "ok"
                }
        """

        if len(comment_text) > 300:
            raise ValueError('The total length of the comment cannot exceed 300 characters.')
        if re.search(r'[a-z]+', comment_text, re.IGNORECASE) and comment_text == comment_text.upper():
            raise ValueError('The comment cannot consist of all capital letters.')
        if len(re.findall(r'#[^#]+\b', comment_text, re.UNICODE | re.MULTILINE)) > 4:
            raise ValueError('The comment cannot contain more than 4 hashtags.')
        if len(re.findall(r'\bhttps?://\S+\.\S+', comment_text)) > 1:
            raise ValueError('The comment cannot contain more than 1 URL.')

        endpoint = 'media/{media_id!s}/comment/'.format(**{'media_id': media_id})
        params = {
            'comment_text': comment_text,
            'user_breadcrumb': gen_user_breadcrumb(len(comment_text)),
            'idempotence_token': self.generate_uuid(),
            'containermodule': 'comments_feed_timeline',
            'radio_type': self.radio_type,
        }
        params.update(self.authenticated_params)
        res = self._call_api(endpoint, params=params)
        if self.auto_patch:
            ClientCompatPatch.comment(res['comment'], drop_incompat_keys=self.drop_incompat_keys)
        return res

    def delete_comment(self, media_id, comment_id):
        """
        Delete a comment

        :param media_id: Media id
        :param comment_id: Comment id
        :return:
            .. code-block:: javascript

                {"status": "ok"}
        """
        endpoint = 'media/{media_id!s}/comment/{comment_id!s}/delete/'.format(**{
            'media_id': media_id, 'comment_id': comment_id})
        params = {}
        params.update(self.authenticated_params)
        res = self._call_api(endpoint, params=params)
        return res

    def bulk_delete_comments(self, media_id, comment_ids):
        """
        Bulk delete comment

        :param media_id: Media id
        :param comment_ids: List of comment ids
        :return:
            .. code-block:: javascript

                {"status": "ok"}
        """
        if not isinstance(comment_ids, list):
            comment_ids = [comment_ids]
        endpoint = 'media/{media_id!s}/comment/bulk_delete/'.format(**{
            'media_id': media_id})
        params = {
            'comment_ids_to_delete': ','.join(
                [str(comment_id) for comment_id in comment_ids])
        }
        params.update(self.authenticated_params)
        res = self._call_api(endpoint, params=params)
        return res

    def media_likers(self, media_id, **kwargs):
        """
        Get users who have liked a post

        :param media_id:
        :return:
        """
        endpoint = 'media/{media_id!s}/likers/'.format(**{'media_id': media_id})
        res = self._call_api(endpoint, query=kwargs)
        if self.auto_patch:
            [ClientCompatPatch.list_user(u, drop_incompat_keys=self.drop_incompat_keys)
             for u in res.get('users', [])]
        return res

    def media_likers_chrono(self, media_id):
        """
        EXPERIMENTAL ENDPOINT, INADVISABLE TO USE.
        Get users who have liked a post in chronological order

        :param media_id:
        :return:
        """
        warnings.warn('This endpoint is experimental. Do not use.', ClientExperimentalWarning)
        res = self._call_api('media/{media_id!s}/likers_chrono/'.format(**{'media_id': media_id}))
        if self.auto_patch:
            [ClientCompatPatch.list_user(u, drop_incompat_keys=self.drop_incompat_keys)
             for u in res.get('users', [])]
        return res

    def post_like(self, media_id, module_name='feed_timeline'):
        """
        Like a post

        :param media_id: Media id
        :param module_name: Example: 'feed_timeline', 'video_view', 'photo_view'
        :return:
            .. code-block:: javascript

                {"status": "ok"}
        """
        endpoint = 'media/{media_id!s}/like/'.format(**{'media_id': media_id})
        params = {
            'media_id': media_id,
            'module_name': module_name,
            'radio_type': self.radio_type,
        }
        params.update(self.authenticated_params)
        # d query param = flag for double tap
        res = self._call_api(endpoint, params=params, query={'d': '1'})
        return res

    def delete_like(self, media_id, module_name='feed_timeline'):
        """
        Unlike a post

        :param media_id:
        :param module_name: Example: 'feed_timeline', 'video_view', 'photo_view'
        :return:
            .. code-block:: javascript

                {"status": "ok"}
        """
        endpoint = 'media/{media_id!s}/unlike/'.format(**{'media_id': media_id})
        params = {
            'media_id': media_id,
            'module_name': module_name,
            'radio_type': self.radio_type,
        }
        params.update(self.authenticated_params)
        res = self._call_api(endpoint, params=params)
        return res

    def media_seen(self, reels):
        """
        Mark multiple stories as seen

        :param reels: A list of reel media objects, or a dict of media_ids and timings
            as defined below.

            .. code-block:: javascript

                {
                    "1309763051087626108_124317_124317": ["1470355944_1470372029"],
                    "1309764045355643149_124317_124317": ["1470356063_1470372039"],
                    "1309818450243415912_124317_124317": ["1470362548_1470372060"],
                    "1309764653429046112_124317_124317": ["1470356135_1470372049"],
                    "1309209597843679372_124317_124317": ["1470289967_1470372013"]
                }

                where
                    1309763051087626108_124317 = <media_id>,
                    124317 = <media.owner_id>
                    1470355944_1470372029 is <media_created_time>_<view_time>

        :return:
        """
        if isinstance(reels, list):
            # is a list of reel media
            reels_seen = {}
            reels = sorted(reels, key=lambda m: m['taken_at'], reverse=True)
            now = int(time.time())
            for i, reel in enumerate(reels):
                reel_seen_at = now - min(i + 1 + randint(0, 2), max(0, now - reel['taken_at']))
                reels_seen['{0!s}_{1!s}'.format(reel['id'], reel['user']['pk'])] = [
                    '{0!s}_{1!s}'.format(reel['taken_at'], reel_seen_at)]
            params = {'reels': reels_seen}
        else:
            params = {'reels': reels}
        params.update(self.authenticated_params)
        res = self._call_api('media/seen/', params=params, version='v2')
        return res

    def comment_like(self, comment_id):
        """
        Like a comment

        :param comment_id:

        :return:
            .. code-block:: javascript

                {"status": "ok"}
        """
        endpoint = 'media/{comment_id!s}/comment_like/'.format(**{'comment_id': comment_id})
        params = self.authenticated_params
        return self._call_api(endpoint, params=params)

    def comment_likers(self, comment_id):
        """
        Get users who have liked a comment

        :param comment_id:
        :return:
        """
        endpoint = 'media/{comment_id!s}/comment_likers/'.format(**{'comment_id': comment_id})
        res = self._call_api(endpoint)
        if self.auto_patch:
            [ClientCompatPatch.list_user(u, drop_incompat_keys=self.drop_incompat_keys)
             for u in res.get('users', [])]
        return res

    def comment_unlike(self, comment_id):
        """
        Unlike a comment

        :param comment_id:
        :return:
            .. code-block:: javascript

                {"status": "ok"}
        """
        endpoint = 'media/{comment_id!s}/comment_unlike/'.format(**{'comment_id': comment_id})
        params = self.authenticated_params
        return self._call_api(endpoint, params=params)

    def save_photo(self, media_id, added_collection_ids=None):
        """
        Save a photo

        :param media_id: Media id
        :param added_collection_ids: optional list of collection IDs to add the media to
        :return:
            .. code-block:: javascript

                {"status": "ok"}
        """
        endpoint = 'media/{media_id!s}/save/'.format(**{'media_id': media_id})
        params = {'radio_type': self.radio_type}
        if added_collection_ids:
            if isinstance(added_collection_ids, str):
                added_collection_ids = [added_collection_ids]
            params['added_collection_ids'] = json.dumps(added_collection_ids, separators=(',', ':'))
        params.update(self.authenticated_params)
        return self._call_api(endpoint, params=params)

    def unsave_photo(self, media_id, removed_collection_ids=None):
        """
        Unsave a photo

        :param media_id:
        :param removed_collection_ids: optional list of collection IDs to remove the media from
        :return:
            .. code-block:: javascript

                {"status": "ok"}
        """
        endpoint = 'media/{media_id!s}/unsave/'.format(**{'media_id': media_id})
        params = {'radio_type': self.radio_type}
        if removed_collection_ids:
            if isinstance(removed_collection_ids, str):
                removed_collection_ids = [removed_collection_ids]
            params['removed_collection_ids'] = json.dumps(removed_collection_ids, separators=(',', ':'))
        params.update(self.authenticated_params)
        return self._call_api(endpoint, params=params)

    def disable_comments(self, media_id):
        """
        Disable comments for a media

        :param media_id:
        :return:
            .. code-block:: javascript

                {"status": "ok"}
        """
        endpoint = 'media/{media_id!s}/disable_comments/'.format(**{'media_id': media_id})
        params = {
            '_csrftoken': self.csrftoken,
            '_uuid': self.uuid,
        }
        res = self._call_api(endpoint, params=params, unsigned=True)
        return res

    def enable_comments(self, media_id):
        """
        Enable comments for a media

        :param media_id:
        :return:
            .. code-block:: javascript

                {"status": "ok"}
        """

        endpoint = 'media/{media_id!s}/enable_comments/'.format(**{'media_id': media_id})
        params = {
            '_csrftoken': self.csrftoken,
            '_uuid': self.uuid,
        }
        res = self._call_api(endpoint, params=params, unsigned=True)
        return res

    def media_only_me(self, media_id, media_type, undo=False):
        """
        Archive/unarchive a media so that it is only viewable by the owner.

        :param media_id:
        :param media_type: One of :attr:`MediaTypes.PHOTO`, :attr:`MediaTypes.VIDEO`, or :attr:`MediaTypes.CAROUSEL`
        :param undo: bool

        :return:
            .. code-block:: javascript

                {"status": "ok"}
        """
        if media_type not in MediaTypes.ALL:
            raise ValueError('Invalid media type.')

        endpoint = 'media/{media_id!s}/{only_me!s}/'.format(**{
            'media_id': media_id,
            'only_me': 'only_me' if not undo else 'undo_only_me'
        })
        params = {'media_id': media_id}
        params.update(self.authenticated_params)
        res = self._call_api(endpoint, params=params, query={'media_type': media_type})
        return res

    def media_undo_only_me(self, media_id, media_type):
        """
        Undo making a media only me.

        :param media_id:
        :param media_type: One of :attr:`MediaTypes.PHOTO`, :attr:`MediaTypes.VIDEO`, or :attr:`MediaTypes.CAROUSEL`
        """
        return self.media_only_me(media_id, media_type, undo=True)

    def story_viewers(self, story_pk, **kwargs):
        """
        Get list of story viewers

        :param story_pk: Story media's PK identifier, e.g. "1700000123"
        :param kwargs:
            **max_id**: For pagination
        :return:
        """
        endpoint = 'media/{story_pk!s}/list_reel_media_viewer/'.format(
            story_pk=story_pk)
        return self._call_api(endpoint, query=kwargs)