Paul Chernoch
Paul Chernoch

Reputation: 5553

Fetch many image files from the web and save them asynchronously

I have a list of Web URLs to image files. I wish to fetch all the image files and write them each to the appropriate directory. The images are all PNGs. In a test program I am able to successfully fetch a single image synchronously:

import urllib.request
import shutil

# This example will download a single traffic image.

# Spoof a well-known browser so servers won't think I am a bot.
class AppURLopener(urllib.request.FancyURLopener):
    version = "Mozilla/5.0"

def getTrafficImage(fromUrl, toPath): 
    baseUrl = "https://mass511.com/map/Cctv/"
    url = f"{baseUrl}{fromUrl}"
    opener = AppURLopener()
    # Request image file from remote server and save to disk locally.
    with opener.open(url) as response, open(toPath, 'wb') as out_file:
        shutil.copyfileobj(response, out_file)

# Camera label on MASS511:
#   I-93-SB-Somerville-Exit 26 Storrow Dr
url = "406443--1"

# Path to store the file
file_name = "C:/Users/pchernoch/projects/HackWeek/traffic-feeds/I-93-SB-Somerville-Exit.png"

getTrafficImage(url, file_name)

How can I repeat this for many URLs and have each fetch performed asynchronously?

If any image cannot be fetched or has an error (like a timeout), I wish to log that error to the console but not stop processing the other files.

I am using Python 3.6.2. My preference is to use the new async/await approach and the aiohttp and asyncio libraries. However, any popular async library (.e.g. curio) will do. I have only been programming in Python for one week, so much is confusing. This answer looks useful, but I do not know how to make use of it: asyncio web scraping 101: fetching multiple urls with aiohttp

Goal: The task to be accomplished is capturing traffic camera images from many Boston cameras every few seconds for a set period of time.

The following is the program I am trying to write, with TODO: marks at the places I am stumped. The program runs on a timer. Every few seconds it will capture another set of images from each of the traffic cameras. The timer loop is not asynchronous, but I want the image capture of many URLs to be async.

import sys, os, datetime, threading, time
import urllib.request
import shutil

# ==================
#    Configuration
# ==================

# Identify the name of the camera with its URL on Mass511 web site
CAMERA_URLS = {
  "I-93-SB-Somerville-Exit 26 Storrow Dr": "406443--1",
  "Road STWB-WB-TNL-Storrow WB": "1407--1",
  "I-93-NB-Dorchester-between x14 & x15 Savin": "406557"
  }

# All cameras have URLs that begin with this prefix
BASE_URL = "https://mass511.com/map/Cctv/"

# Store photos in subdirectories under this directory
PHOTO_STORAGE_DIR = "C:/Users/pchernoch/projects/HackWeek/traffic-feeds"

# Take this many pictures from each camera
SNAP_COUNT = 5

# Capture new set of pictures after this many seconds 
POLLING_INTERVAL_SECONDS = 2

# ==================
#      Classes
# ==================

def logMessage(msg):
    sys.stdout.write(msg + '\n')
    sys.stdout.flush()

# Change the presumed name of the browser to fool robot detectors
class AppURLopener(urllib.request.FancyURLopener):
    version = "Mozilla/5.0"

# Can Read file from one camera and save to a file
class Camera(object):
  def __init__(self, sourceUrl, targetDirectory, name, extension):
    self.SourceUrl = sourceUrl
    self.TargetDirectory = targetDirectory
    self.Name = name
    self.Extension = extension

  def TargetFile(self, time):
    timeStamp = time.strftime("%Y-%m-%d-%H-%M-%S") 
    return f"{self.TargetDirectory}/{timeStamp}.{self.Extension}"

  def Get(self):
      fileName = self.TargetFile(datetime.datetime.now())
      logMessage(f"  - For camera {self.Name}, get {self.SourceUrl} and save as {fileName}")
      # TODO: GET IMAGE FILE FROM WEB AND SAVE IN FILE HERE

# Can poll multiple cameras once
class CameraPoller(object):
  def __init__(self, urlMap, baseUrl, rootDir):
    self.CamerasToRead = []
    for cameraName, urlSuffix in urlMap.items():
      url = f"{baseUrl}{urlSuffix}"
      targetDir = f"{rootDir}/{cameraName}"
      if not os.path.exists(targetDir):
        os.makedirs(targetDir)
      camera = Camera(url, targetDir, cameraName, "png")
      self.CamerasToRead.append(camera)

  def Snap(self):
    # TODO: MAKE THIS LOOP ASYNC
    for camera in self.CamerasToRead:
      camera.Get()

# Repeatedly poll all cameras, then sleep
def get_images(poller, pollingInterval, snapCount):
    next_call = time.time()
    for i in range(0, snapCount):
        now = datetime.datetime.now()
        timeString = now.strftime("%Y-%m-%d-%H-%M-%S") 
        logMessage(f"\nPoll at {timeString}")
        poller.Snap()
        next_call = next_call + pollingInterval
        time.sleep(next_call - time.time())

# ==================
#    Application
# ==================

if __name__ == "__main__":

    cameraPoller = CameraPoller(CAMERA_URLS, BASE_URL, PHOTO_STORAGE_DIR)

    # Poll cameras i na separate thread. It is a daemon, so when the main thread exits, it will stop.
    timerThread = threading.Thread(target=get_images, args=([cameraPoller, POLLING_INTERVAL_SECONDS, SNAP_COUNT]))
    timerThread.daemon = False
    timerThread.start()

    timerThread.join()

    endTime = datetime.datetime.now()
    endTimeString = endTime.strftime("%Y-%m-%d-%H-%M-%S") 
    logMessage(f"Exiting Poller at {endTimeString}")

Upvotes: 1

Views: 2540

Answers (2)

Arthur
Arthur

Reputation: 4251

Here is an asyncio version. Untested, but shouldn't be too far off.

With asyncio, basically you launch all your tasks, and gather the results with asyncio.gather. But starting tons of requests concurrently won't work, so I also added a Semaphore in CameraPoller: this ensures that at most 10 concurrent request will run

import asyncio
import aiohttp
import datetime
import time


# ==================
#    Configuration
# ==================

# Identify the name of the camera with its URL on Mass511 web site
CAMERA_URLS = {
  "I-93-SB-Somerville-Exit 26 Storrow Dr": "406443--1",
  "Road STWB-WB-TNL-Storrow WB": "1407--1",
  "I-93-NB-Dorchester-between x14 & x15 Savin": "406557"
}

# All cameras have URLs that begin with this prefix
BASE_URL = "https://mass511.com/map/Cctv/"

# Store photos in subdirectories under this directory
PHOTO_STORAGE_DIR = "C:/Users/pchernoch/projects/HackWeek/traffic-feeds"

# Take this many pictures from each camera
SNAP_COUNT = 5

# Capture new set of pictures after this many seconds 
POLLING_INTERVAL_SECONDS = 2

USER_AGENT = 'Mozilla/5.0'

# ==================
#      Classes
# ==================

def logMessage(msg):
  print(msg)

# Can Read file from one camera and save to a file
class Camera:
  def __init__(self, session, sourceUrl, targetDirectory, name, extension):
    self.session = session
    self.SourceUrl = sourceUrl
    self.TargetDirectory = targetDirectory
    self.Name = name
    self.Extension = extension

  def TargetFile(self, time):
    timeStamp = time.strftime("%Y-%m-%d-%H-%M-%S") 
    return f"{self.TargetDirectory}/{timeStamp}.{self.Extension}"

  async def Get(self):
      fileName = self.TargetFile(datetime.datetime.now())
      message = 
      # Request image file from remote server
      async with self.session.get(self.SourceUrl, headers={'User-Agent': USER_AGENT}) as resp:
        data = await resp.read()
      # and save to disk locally.
      with open(fileName, 'wb') as out_file:
        out_file.write(data)
      logMessage(f"  - For camera {self.Name}, get {self.SourceUrl} and save as {fileName}")


# Can poll multiple cameras once
class CameraPoller:
  def __init__(self, session, urlMap, baseUrl, rootDir, concurrency=10):
    self.CamerasToRead = []
    for cameraName, urlSuffix in urlMap.items():
      url = f"{baseUrl}{urlSuffix}"
      targetDir = f"{rootDir}/{cameraName}"
      if not os.path.exists(targetDir):
        os.makedirs(targetDir)
      camera = Camera(session, url, targetDir, cameraName, "png")
      self.CamerasToRead.append(camera)

    self.sem = asyncio.BoundedSemaphore(concurrency)

  async def _snap(self, camera):
    async with self.sem:
      await camera.Get()

  async def Snap(self):
    await asyncio.gather(*(self._snap(cam) for cam in self.CamerasToRead))

  # Repeatedly poll all cameras, then sleep
  async def poll(self, pollingInterval, snapCount):
    loop = asyncio.get_event_loop()
    next_call = loop.time()
    for i in range(0, snapCount):
        now = datetime.datetime.now()
        timeString = now.strftime("%Y-%m-%d-%H-%M-%S") 
        logMessage(f"\nPoll at {timeString}")

        await self.Snap()

        next_call = next_call + pollingInterval
        await asyncio.sleep(next_call - loop.time())

# ==================
#    Application
# ==================

async def main():
  async with aiohttp.ClientSession as session:
    poller = ameraPoller(session, CAMERA_URLS, BASE_URL, PHOTO_STORAGE_DIR)
    await poller.poll(POLLING_INTERVAL_SECONDS, SNAP_COUNT)

  endTime = datetime.datetime.now()
  endTimeString = endTime.strftime("%Y-%m-%d-%H-%M-%S") 
  logMessage(f"Exiting Poller at {endTimeString}")


if __name__ == "__main__":
  loop = asyncio.get_event_loop()
  loop.run_until_complete(main())

Upvotes: 0

Paul Chernoch
Paul Chernoch

Reputation: 5553

Here is the same code, with the URL grabbing done using ThreadPoolExecutor. It required the fewest changes to my code. Thanks to @larsks for pointing me in the right direction.

import sys, os, datetime, threading, time
import urllib.request
from concurrent.futures import ThreadPoolExecutor
import shutil

# ==================
#    Configuration
# ==================

# Identify the name of the camera with its URL on Mass511 web site
CAMERA_URLS = {
  "I-93-SB-Somerville-Exit 26 Storrow Dr": "406443--1",
  "Road STWB-WB-TNL-Storrow WB": "1407--1",
  "I-93-NB-Dorchester-between x14 & x15 Savin": "406557"
  }

# All cameras have URLs that begin with this prefix
BASE_URL = "https://mass511.com/map/Cctv/"

# Store photos in subdirectories under this directory
PHOTO_STORAGE_DIR = "C:/Users/pchernoch/projects/HackWeek/traffic-feeds"

# Take this many pictures from each camera
SNAP_COUNT = 5

# Capture new set of pictures after this many seconds 
POLLING_INTERVAL_SECONDS = 2

# ==================
#      Classes
# ==================

def logMessage(msg):
    sys.stdout.write(msg + '\n')
    sys.stdout.flush()

# Change the presumed name of the browser to fool robot detectors
class AppURLopener(urllib.request.FancyURLopener):
    version = "Mozilla/5.0"

# Can Read file from one camera and save to a file
class Camera(object):
  def __init__(self, sourceUrl, targetDirectory, name, extension):
    self.SourceUrl = sourceUrl
    self.TargetDirectory = targetDirectory
    self.Name = name
    self.Extension = extension

  def TargetFile(self, time):
    timeStamp = time.strftime("%Y-%m-%d-%H-%M-%S") 
    return f"{self.TargetDirectory}/{timeStamp}.{self.Extension}"

  def Get(self):
      fileName = self.TargetFile(datetime.datetime.now())
      message = f"  - For camera {self.Name}, get {self.SourceUrl} and save as {fileName}"
      # Request image file from remote server and save to disk locally.
      opener = AppURLopener()
      with opener.open(self.SourceUrl) as response, open(fileName, 'wb') as out_file:
        shutil.copyfileobj(response, out_file)
      logMessage(message)
      return message

def snap_picture(camera):
  return camera.Get()


# Can poll multiple cameras once
class CameraPoller(object):
  def __init__(self, urlMap, baseUrl, rootDir):
    self.CamerasToRead = []
    for cameraName, urlSuffix in urlMap.items():
      url = f"{baseUrl}{urlSuffix}"
      targetDir = f"{rootDir}/{cameraName}"
      if not os.path.exists(targetDir):
        os.makedirs(targetDir)
      camera = Camera(url, targetDir, cameraName, "png")
      self.CamerasToRead.append(camera)

  def Snap(self):
    with ThreadPoolExecutor(max_workers=10) as executor:
      results = executor.map(snap_picture, self.CamerasToRead)

# Repeatedly poll all cameras, then sleep
def get_images(poller, pollingInterval, snapCount):
    next_call = time.time()
    for i in range(0, snapCount):
        now = datetime.datetime.now()
        timeString = now.strftime("%Y-%m-%d-%H-%M-%S") 
        logMessage(f"\nPoll at {timeString}")
        poller.Snap()
        next_call = next_call + pollingInterval
        time.sleep(next_call - time.time())

# ==================
#    Application
# ==================

if __name__ == "__main__":

    cameraPoller = CameraPoller(CAMERA_URLS, BASE_URL, PHOTO_STORAGE_DIR)

    # Poll cameras i na separate thread. It is a daemon, so when the main thread exits, it will stop.
    timerThread = threading.Thread(target=get_images, args=([cameraPoller, POLLING_INTERVAL_SECONDS, SNAP_COUNT]))
    timerThread.daemon = False
    timerThread.start()

    timerThread.join()

    endTime = datetime.datetime.now()
    endTimeString = endTime.strftime("%Y-%m-%d-%H-%M-%S") 
    logMessage(f"Exiting Poller at {endTimeString}")

Upvotes: 1

Related Questions