123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253 |
- #!/usr/bin/env python3
- # -*- encoding: utf-8 -*-
- # Author: MoeClub.org
- # pip3 install pyrogram tgcrypto aiohttp
- import os
- import math
- import asyncio
- from aiohttp import web
- from typing import Union
- from pyrogram import Client, raw
- from pyrogram.session import Session, Auth
- from pyrogram import file_id
- class TGSteamer:
- # https://my.telegram.org/ # apiId, apiHash
- # https://telegram.me/BotFather # botToken
- cacheFileId = {}
- lock = asyncio.Lock()
- @classmethod
- async def chunk_size(cls, length):
- return 2 ** max(min(math.ceil(math.log2(length / 1024)), 10), 2) * 1024
- @classmethod
- async def offset_fix(cls, offset, chunkSize):
- offset -= offset % chunkSize
- return offset
- @classmethod
- async def get_client(cls, apiId, apiHash, botToken, appName=os.path.basename(os.path.abspath(__file__)).split(".")[0]):
- _client = Client(
- name=appName,
- api_id=int(str(apiId).strip()),
- api_hash=str(apiHash).strip(),
- bot_token=str(botToken).strip(),
- in_memory=True,
- )
- await _client.start()
- assert _client.is_connected and _client.is_initialized
- return _client
- @classmethod
- async def get_file_properties(cls, fileId):
- async with cls.lock:
- fileProperties = cls.cacheFileId.get(fileId, None)
- if fileProperties is None:
- fileProperties = file_id.FileId.decode(fileId)
- setattr(fileProperties, "file_size", getattr(fileProperties, "file_size", 0))
- setattr(fileProperties, "file_name", getattr(fileProperties, "file_name", ""))
- cls.cacheFileId[fileId] = fileProperties
- return fileProperties
- @classmethod
- async def get_session(cls, client: Client, data: file_id.FileId):
- async with client.media_sessions_lock:
- session = client.media_sessions.get(data.dc_id, None)
- if session is None:
- test_mode = await client.storage.test_mode()
- dc_id = await client.storage.dc_id()
- if data.dc_id != dc_id:
- auth = await Auth(client, data.dc_id, test_mode).create()
- else:
- auth = await client.storage.auth_key()
- session = Session(client, data.dc_id, auth, test_mode, is_media=True, is_cdn=False)
- try:
- await session.start()
- if data.dc_id != dc_id:
- exported = await client.invoke(raw.functions.auth.ExportAuthorization(dc_id=data.dc_id))
- await session.invoke(raw.functions.auth.ImportAuthorization(id=exported.id, bytes=exported.bytes))
- client.media_sessions[data.dc_id] = session
- except Exception as e:
- session = None
- return session
- @classmethod
- async def get_location(cls, data: file_id.FileId):
- file_type = data.file_type
- if file_type == file_id.FileType.PHOTO:
- location = raw.types.InputPhotoFileLocation(
- id=data.media_id,
- access_hash=data.access_hash,
- file_reference=data.file_reference,
- thumb_size=data.thumbnail_size
- )
- else:
- location = raw.types.InputDocumentFileLocation(
- id=data.media_id,
- access_hash=data.access_hash,
- file_reference=data.file_reference,
- thumb_size=data.thumbnail_size
- )
- return location
- @classmethod
- async def yield_bytes(cls, client: Client, fileId: file_id.FileId, offset: int, chunkSize: int) -> Union[str, None]:
- data = cls.get_file_properties(fileId) if isinstance(fileId, str) else fileId
- location = await cls.get_location(data)
- session = await cls.get_session(client, data)
- if session is None:
- raise Exception("InvalidSession")
- r = await session.send(
- raw.functions.upload.GetFile(
- location=location,
- offset=offset,
- limit=chunkSize
- ),
- )
- if isinstance(r, raw.types.upload.File):
- while True:
- chunk = r.bytes
- if not chunk:
- break
- offset += chunkSize
- yield chunk
- r = await session.send(
- raw.functions.upload.GetFile(
- location=location,
- offset=offset,
- limit=chunkSize
- ),
- )
- @classmethod
- async def download_as_bytesio(cls, client, fileId, chunkSize=1024 * 1024):
- data = cls.get_file_properties(fileId) if isinstance(fileId, str) else fileId
- location = await cls.get_location(data)
- session = await cls.get_session(client, data)
- if session is None:
- raise Exception("InvalidSession")
- offset = 0
- r = await session.send(
- raw.functions.upload.GetFile(
- location=location,
- offset=offset,
- limit=chunkSize
- )
- )
- Bytes = []
- if isinstance(r, raw.types.upload.File):
- while True:
- chunk = r.bytes
- if not chunk:
- break
- Bytes += chunk
- offset += chunkSize
- r = await session.send(
- raw.functions.upload.GetFile(
- location=location,
- offset=offset,
- limit=chunkSize
- )
- )
- return Bytes
- class Web:
- TelegramFile = TGSteamer()
- TelegramFileClient = None
- Index = "TelegramFile"
- @classmethod
- def Headers(cls, **kwargs):
- headers = {
- "Server": "TelegramFile"
- }
- for item in kwargs:
- headers[item] = kwargs[item]
- return headers
- @classmethod
- async def fileHandler(cls, request: web.Request):
- try:
- _fileId = str(request.match_info["fileId"]).strip("/")
- assert len(_fileId) > 0
- try:
- fileId = await cls.TelegramFile.get_file_properties(_fileId)
- except Exception as e:
- raise Exception("Invalid FileId")
- return await cls.streamer(request=request, fileId=fileId)
- except Exception as e:
- return web.Response(text=str(e).strip(), status=404, headers={"Server": cls.Index}, content_type="text/plain")
- @classmethod
- async def streamer(cls, request: web.Request, fileId: file_id.FileId):
- range_header = request.headers.get("Range", 0)
- file_size = fileId.file_size
- rangeSupport = True if file_size > 0 else False
- file_name = str(fileId.media_id).strip() if fileId.file_name == "" else str(fileId.file_name).strip()
- headers = {
- "Content-Type": "application/octet-stream",
- "Content-Disposition": f'attachment; filename="{file_name}"',
- "Server": cls.Index,
- }
- try:
- assert range_header and rangeSupport
- from_bytes, until_bytes = range_header.replace("bytes=", "").split("-")
- from_bytes = int(from_bytes) if int(from_bytes) >= 0 else 0
- until_bytes = int(until_bytes) if until_bytes and int(until_bytes) > from_bytes else file_size - 1
- req_length = until_bytes - from_bytes + 1
- headers["Accept-Ranges"] = "bytes"
- headers["Content-Length"] = str(req_length),
- headers["Content-Range"] = f"bytes {from_bytes}-{until_bytes}/{file_size}"
- except:
- from_bytes = 0
- chunk_size = 1024 * 1024 if file_size <= 0 else cls.TelegramFile.chunk_size(file_size)
- offset = from_bytes - (from_bytes % chunk_size)
- body = cls.TelegramFile.yield_bytes(cls.TelegramFileClient, fileId, offset, chunk_size)
- code = 206 if rangeSupport else 200
- return web.Response(status=code, body=body, headers=headers)
- if __name__ == "__main__":
- loop = asyncio.get_event_loop()
- Web.TelegramFileClient = loop.run_until_complete(Web.TelegramFile.get_client(
- int("appId"),
- str("appHash"),
- str("botToken")
- ))
- app = web.Application()
- app.add_routes([web.get(path=str("/{}").format(str(Web.Index).strip("/")) + '{fileId:/[-_\w]+}', handler=Web.fileHandler, allow_head=False)])
- logging_format = '%t %a %s %r [%Tfs]'
- web.run_app(app=app, host="0.0.0.0", port=63838, access_log_format=logging_format, loop=loop)
|