Module TikTokApi.browser

Expand source code
import random
import time
import string
import requests
import logging
from threading import Thread
import time
import datetime
import random


# Import Detection From Stealth
from .stealth import stealth
from .get_acrawler import get_acrawler
from playwright.sync_api import sync_playwright

playwright = None


def get_playwright():
    global playwright
    if playwright is None:
        try:
            playwright = sync_playwright().start()
        except Exception as e:
            raise e

    return playwright


class browser:
    def __init__(
        self,
        **kwargs,
    ):
        self.debug = kwargs.get("debug", False)
        self.proxy = kwargs.get("proxy", None)
        self.api_url = kwargs.get("api_url", None)
        self.referrer = kwargs.get("referer", "https://www.tiktok.com/")
        self.language = kwargs.get("language", "en")
        self.executablePath = kwargs.get("executablePath", None)
        self.did = kwargs.get("custom_did", None)
        find_redirect = kwargs.get("find_redirect", False)

        args = kwargs.get("browser_args", [])
        options = kwargs.get("browser_options", {})

        if len(args) == 0:
            self.args = []
        else:
            self.args = args

        self.options = {
            "headless": True,
            "handle_sigint": True,
            "handle_sigterm": True,
            "handle_sighup": True,
        }

        if self.proxy is not None:
            if "@" in self.proxy:
                server_prefix = self.proxy.split("://")[0]
                address = self.proxy.split("@")[1]
                self.options["proxy"] = {
                    "server": server_prefix + "://" + address,
                    "username": self.proxy.split("://")[1].split(":")[0],
                    "password": self.proxy.split("://")[1].split("@")[0].split(":")[1],
                }
            else:
                self.options["proxy"] = {"server": self.proxy}

        self.options.update(options)

        if self.executablePath is not None:
            self.options["executablePath"] = self.executablePath

        try:
            self.browser = get_playwright().webkit.launch(
                args=self.args, **self.options
            )
        except Exception as e:
            raise e
            logging.critical(e)

        context = self.create_context(set_useragent=True)
        page = context.new_page()
        self.get_params(page)
        context.close()

    def get_params(self, page) -> None:
        # self.browser_language = await self.page.evaluate("""() => { return
        # navigator.language || navigator.userLanguage; }""")
        self.browser_language = ""
        # self.timezone_name = await self.page.evaluate("""() => { return
        # Intl.DateTimeFormat().resolvedOptions().timeZone; }""")
        self.timezone_name = ""
        # self.browser_platform = await self.page.evaluate("""() => { return window.navigator.platform; }""")
        self.browser_platform = ""
        # self.browser_name = await self.page.evaluate("""() => { return window.navigator.appCodeName; }""")
        self.browser_name = ""
        # self.browser_version = await self.page.evaluate("""() => { return window.navigator.appVersion; }""")
        self.browser_version = ""

        self.width = page.evaluate("""() => { return screen.width; }""")
        self.height = page.evaluate("""() => { return screen.height; }""")

    def create_context(self, set_useragent=False):
        iphone = playwright.devices["iPhone 11 Pro"]
        iphone["viewport"] = {
            "width": random.randint(320, 1920),
            "height": random.randint(320, 1920),
        }
        iphone["device_scale_factor"] = random.randint(1, 3)
        iphone["is_mobile"] = random.randint(1, 2) == 1
        iphone["has_touch"] = random.randint(1, 2) == 1

        context = self.browser.new_context(**iphone)
        if set_useragent:
            self.userAgent = iphone["user_agent"]

        return context

    def base36encode(self, number, alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"):
        """Converts an integer to a base36 string."""
        base36 = ""
        sign = ""

        if number < 0:
            sign = "-"
            number = -number

        if 0 <= number < len(alphabet):
            return sign + alphabet[number]

        while number != 0:
            number, i = divmod(number, len(alphabet))
            base36 = alphabet[i] + base36

        return sign + base36

    def gen_verifyFp(self):
        chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"[:]
        chars_len = len(chars)
        scenario_title = self.base36encode(int(time.time() * 1000))
        uuid = [0] * 36
        uuid[8] = "_"
        uuid[13] = "_"
        uuid[18] = "_"
        uuid[23] = "_"
        uuid[14] = "4"

        for i in range(36):
            if uuid[i] != 0:
                continue
            r = int(random.random() * chars_len)
            uuid[i] = chars[int((3 & r) | 8 if i == 19 else r)]

        return f'verify_{scenario_title.lower()}_{"".join(uuid)}'

    def sign_url(self, **kwargs):
        url = kwargs.get("url", None)
        if url is None:
            raise Exception("sign_url required a url parameter")
        context = self.create_context()
        page = context.new_page()
        verifyFp = "".join(
            random.choice(
                string.ascii_lowercase + string.ascii_uppercase + string.digits
            )
            for i in range(16)
        )
        if kwargs.get("gen_new_verifyFp", False):
            verifyFp = self.gen_verifyFp()
        else:
            verifyFp = kwargs.get(
                "custom_verifyFp",
                "verify_khgp4f49_V12d4mRX_MdCO_4Wzt_Ar0k_z4RCQC9pUDpX",
            )

        if kwargs.get("custom_did") is not None:
            did = kwargs.get("custom_did", None)
        elif self.did is None:
            did = str(random.randint(10000, 999999999))
        else:
            did = self.did

        page.set_content("<script> " + get_acrawler() + " </script>")
        evaluatedPage = page.evaluate(
            '''() => {
            var url = "'''
            + url
            + "&verifyFp="
            + verifyFp
            + """&did="""
            + did
            + """"
            var token = window.byted_acrawler.sign({url: url});
            return token;
            }"""
        )
        context.close()
        return (
            verifyFp,
            did,
            evaluatedPage,
        )

    def clean_up(self):
        try:
            self.browser.close()
        except Exception:
            logging.info("cleanup failed")
        # playwright.stop()

    def find_redirect(self, url):
        self.page.goto(url, {"waitUntil": "load"})
        self.redirect_url = self.page.url

    def __format_proxy(self, proxy):
        if proxy is not None:
            return {"http": proxy, "https": proxy}
        else:
            return None

    def __get_js(self):
        return requests.get(
            "https://sf16-muse-va.ibytedtos.com/obj/rc-web-sdk-gcs/acrawler.js",
            proxies=self.__format_proxy(self.proxy),
        ).text

Functions

def get_playwright()
Expand source code
def get_playwright():
    global playwright
    if playwright is None:
        try:
            playwright = sync_playwright().start()
        except Exception as e:
            raise e

    return playwright

Classes

class browser (**kwargs)
Expand source code
class browser:
    def __init__(
        self,
        **kwargs,
    ):
        self.debug = kwargs.get("debug", False)
        self.proxy = kwargs.get("proxy", None)
        self.api_url = kwargs.get("api_url", None)
        self.referrer = kwargs.get("referer", "https://www.tiktok.com/")
        self.language = kwargs.get("language", "en")
        self.executablePath = kwargs.get("executablePath", None)
        self.did = kwargs.get("custom_did", None)
        find_redirect = kwargs.get("find_redirect", False)

        args = kwargs.get("browser_args", [])
        options = kwargs.get("browser_options", {})

        if len(args) == 0:
            self.args = []
        else:
            self.args = args

        self.options = {
            "headless": True,
            "handle_sigint": True,
            "handle_sigterm": True,
            "handle_sighup": True,
        }

        if self.proxy is not None:
            if "@" in self.proxy:
                server_prefix = self.proxy.split("://")[0]
                address = self.proxy.split("@")[1]
                self.options["proxy"] = {
                    "server": server_prefix + "://" + address,
                    "username": self.proxy.split("://")[1].split(":")[0],
                    "password": self.proxy.split("://")[1].split("@")[0].split(":")[1],
                }
            else:
                self.options["proxy"] = {"server": self.proxy}

        self.options.update(options)

        if self.executablePath is not None:
            self.options["executablePath"] = self.executablePath

        try:
            self.browser = get_playwright().webkit.launch(
                args=self.args, **self.options
            )
        except Exception as e:
            raise e
            logging.critical(e)

        context = self.create_context(set_useragent=True)
        page = context.new_page()
        self.get_params(page)
        context.close()

    def get_params(self, page) -> None:
        # self.browser_language = await self.page.evaluate("""() => { return
        # navigator.language || navigator.userLanguage; }""")
        self.browser_language = ""
        # self.timezone_name = await self.page.evaluate("""() => { return
        # Intl.DateTimeFormat().resolvedOptions().timeZone; }""")
        self.timezone_name = ""
        # self.browser_platform = await self.page.evaluate("""() => { return window.navigator.platform; }""")
        self.browser_platform = ""
        # self.browser_name = await self.page.evaluate("""() => { return window.navigator.appCodeName; }""")
        self.browser_name = ""
        # self.browser_version = await self.page.evaluate("""() => { return window.navigator.appVersion; }""")
        self.browser_version = ""

        self.width = page.evaluate("""() => { return screen.width; }""")
        self.height = page.evaluate("""() => { return screen.height; }""")

    def create_context(self, set_useragent=False):
        iphone = playwright.devices["iPhone 11 Pro"]
        iphone["viewport"] = {
            "width": random.randint(320, 1920),
            "height": random.randint(320, 1920),
        }
        iphone["device_scale_factor"] = random.randint(1, 3)
        iphone["is_mobile"] = random.randint(1, 2) == 1
        iphone["has_touch"] = random.randint(1, 2) == 1

        context = self.browser.new_context(**iphone)
        if set_useragent:
            self.userAgent = iphone["user_agent"]

        return context

    def base36encode(self, number, alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"):
        """Converts an integer to a base36 string."""
        base36 = ""
        sign = ""

        if number < 0:
            sign = "-"
            number = -number

        if 0 <= number < len(alphabet):
            return sign + alphabet[number]

        while number != 0:
            number, i = divmod(number, len(alphabet))
            base36 = alphabet[i] + base36

        return sign + base36

    def gen_verifyFp(self):
        chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"[:]
        chars_len = len(chars)
        scenario_title = self.base36encode(int(time.time() * 1000))
        uuid = [0] * 36
        uuid[8] = "_"
        uuid[13] = "_"
        uuid[18] = "_"
        uuid[23] = "_"
        uuid[14] = "4"

        for i in range(36):
            if uuid[i] != 0:
                continue
            r = int(random.random() * chars_len)
            uuid[i] = chars[int((3 & r) | 8 if i == 19 else r)]

        return f'verify_{scenario_title.lower()}_{"".join(uuid)}'

    def sign_url(self, **kwargs):
        url = kwargs.get("url", None)
        if url is None:
            raise Exception("sign_url required a url parameter")
        context = self.create_context()
        page = context.new_page()
        verifyFp = "".join(
            random.choice(
                string.ascii_lowercase + string.ascii_uppercase + string.digits
            )
            for i in range(16)
        )
        if kwargs.get("gen_new_verifyFp", False):
            verifyFp = self.gen_verifyFp()
        else:
            verifyFp = kwargs.get(
                "custom_verifyFp",
                "verify_khgp4f49_V12d4mRX_MdCO_4Wzt_Ar0k_z4RCQC9pUDpX",
            )

        if kwargs.get("custom_did") is not None:
            did = kwargs.get("custom_did", None)
        elif self.did is None:
            did = str(random.randint(10000, 999999999))
        else:
            did = self.did

        page.set_content("<script> " + get_acrawler() + " </script>")
        evaluatedPage = page.evaluate(
            '''() => {
            var url = "'''
            + url
            + "&verifyFp="
            + verifyFp
            + """&did="""
            + did
            + """"
            var token = window.byted_acrawler.sign({url: url});
            return token;
            }"""
        )
        context.close()
        return (
            verifyFp,
            did,
            evaluatedPage,
        )

    def clean_up(self):
        try:
            self.browser.close()
        except Exception:
            logging.info("cleanup failed")
        # playwright.stop()

    def find_redirect(self, url):
        self.page.goto(url, {"waitUntil": "load"})
        self.redirect_url = self.page.url

    def __format_proxy(self, proxy):
        if proxy is not None:
            return {"http": proxy, "https": proxy}
        else:
            return None

    def __get_js(self):
        return requests.get(
            "https://sf16-muse-va.ibytedtos.com/obj/rc-web-sdk-gcs/acrawler.js",
            proxies=self.__format_proxy(self.proxy),
        ).text

Methods

def base36encode(self, number, alphabet='0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ')

Converts an integer to a base36 string.

Expand source code
def base36encode(self, number, alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"):
    """Converts an integer to a base36 string."""
    base36 = ""
    sign = ""

    if number < 0:
        sign = "-"
        number = -number

    if 0 <= number < len(alphabet):
        return sign + alphabet[number]

    while number != 0:
        number, i = divmod(number, len(alphabet))
        base36 = alphabet[i] + base36

    return sign + base36
def clean_up(self)
Expand source code
def clean_up(self):
    try:
        self.browser.close()
    except Exception:
        logging.info("cleanup failed")
def create_context(self, set_useragent=False)
Expand source code
def create_context(self, set_useragent=False):
    iphone = playwright.devices["iPhone 11 Pro"]
    iphone["viewport"] = {
        "width": random.randint(320, 1920),
        "height": random.randint(320, 1920),
    }
    iphone["device_scale_factor"] = random.randint(1, 3)
    iphone["is_mobile"] = random.randint(1, 2) == 1
    iphone["has_touch"] = random.randint(1, 2) == 1

    context = self.browser.new_context(**iphone)
    if set_useragent:
        self.userAgent = iphone["user_agent"]

    return context
def find_redirect(self, url)
Expand source code
def find_redirect(self, url):
    self.page.goto(url, {"waitUntil": "load"})
    self.redirect_url = self.page.url
def gen_verifyFp(self)
Expand source code
def gen_verifyFp(self):
    chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"[:]
    chars_len = len(chars)
    scenario_title = self.base36encode(int(time.time() * 1000))
    uuid = [0] * 36
    uuid[8] = "_"
    uuid[13] = "_"
    uuid[18] = "_"
    uuid[23] = "_"
    uuid[14] = "4"

    for i in range(36):
        if uuid[i] != 0:
            continue
        r = int(random.random() * chars_len)
        uuid[i] = chars[int((3 & r) | 8 if i == 19 else r)]

    return f'verify_{scenario_title.lower()}_{"".join(uuid)}'
def get_params(self, page) ‑> NoneType
Expand source code
def get_params(self, page) -> None:
    # self.browser_language = await self.page.evaluate("""() => { return
    # navigator.language || navigator.userLanguage; }""")
    self.browser_language = ""
    # self.timezone_name = await self.page.evaluate("""() => { return
    # Intl.DateTimeFormat().resolvedOptions().timeZone; }""")
    self.timezone_name = ""
    # self.browser_platform = await self.page.evaluate("""() => { return window.navigator.platform; }""")
    self.browser_platform = ""
    # self.browser_name = await self.page.evaluate("""() => { return window.navigator.appCodeName; }""")
    self.browser_name = ""
    # self.browser_version = await self.page.evaluate("""() => { return window.navigator.appVersion; }""")
    self.browser_version = ""

    self.width = page.evaluate("""() => { return screen.width; }""")
    self.height = page.evaluate("""() => { return screen.height; }""")
def sign_url(self, **kwargs)
Expand source code
def sign_url(self, **kwargs):
    url = kwargs.get("url", None)
    if url is None:
        raise Exception("sign_url required a url parameter")
    context = self.create_context()
    page = context.new_page()
    verifyFp = "".join(
        random.choice(
            string.ascii_lowercase + string.ascii_uppercase + string.digits
        )
        for i in range(16)
    )
    if kwargs.get("gen_new_verifyFp", False):
        verifyFp = self.gen_verifyFp()
    else:
        verifyFp = kwargs.get(
            "custom_verifyFp",
            "verify_khgp4f49_V12d4mRX_MdCO_4Wzt_Ar0k_z4RCQC9pUDpX",
        )

    if kwargs.get("custom_did") is not None:
        did = kwargs.get("custom_did", None)
    elif self.did is None:
        did = str(random.randint(10000, 999999999))
    else:
        did = self.did

    page.set_content("<script> " + get_acrawler() + " </script>")
    evaluatedPage = page.evaluate(
        '''() => {
        var url = "'''
        + url
        + "&verifyFp="
        + verifyFp
        + """&did="""
        + did
        + """"
        var token = window.byted_acrawler.sign({url: url});
        return token;
        }"""
    )
    context.close()
    return (
        verifyFp,
        did,
        evaluatedPage,
    )