История изменений
Исправление vvn_black, (текущая версия) :
Как-то так на 3.7, если поток не рабочий check
выкинет эксепшен, который надо обработать.
import asyncio
import aiohttp
URL = 'http://rtmp.domain.tld:2300'
async def check(url, chunk_size, timeout_sec):
timeout = aiohttp.ClientTimeout(total=timeout_sec)
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=timeout) as resp:
content = await resp.content.read(chunk_size)
assert chunk_size == len(content)
asyncio.run(check(URL, 256, 5))
Если хочется request и синхронности, то по сути также.
Исходная версия vvn_black, :
Как-то так на 3.7, если поток не рабочий check
выкинет эксепшен, который надо обработать.
import asyncio
import aiohttp
URL = 'http://rtmp.domain.tld:2300'
async def check(url, chunk_size):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
content = await resp.content.read(chunk_size)
assert chunk_size == len(content)
asyncio.run(check(URL, 256))
Если хочется request и синхронности, то по сути также.