Skip to content

Commit

Permalink
First cut at wpull 2.x plugin porting; many formatting fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
falconkirtaran committed Jan 3, 2017
1 parent 365c10d commit 967d5aa
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 67 deletions.
239 changes: 224 additions & 15 deletions pipeline/archive_bot_plugin.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
""" ArchiveBot wpull 2.x plugin (replaces 1.x hooks)
This module implements the integration layer between ArchiveBot and wpull. In
particular, it handles ignore settings, settings changes, dashboard reporting,
and aborts.
"""

# The ArchiveBot plugin will be split across multiple modules, but
# sys.path for plugins does not include the plugin file's directory.
# We add that here.
import os
import sys
import random
import time
import logging
import re
sys.path.append(os.path.dirname(os.path.realpath(__file__)))

# Import wpull bits used by the plugin.
Expand All @@ -11,63 +22,261 @@
from wpull.pipeline.app import AppSession
from wpull.pipeline.item import URLRecord
from wpull.pipeline.session import ItemSession
from wpull.protocol.abstract.request import BaseResponse
from wpull.stats import Statistics
from wpull.url import URLInfo

from archivebot import shared_config
from archivebot.control import Control
from archivebot.wpull import settings as mod_settings


def _extract_response_code(item_session: ItemSession) -> int:
statcode = 0

try:
# duck typing: assume the response is
# wpull.protocol.http.request.Response
statcode = item_session.response.status_code
except (AttributeError, KeyError):
pass

try:
# duck typing: assume the response is
# wpull.protocol.ftp.request.Response
statcode = item_session.response.reply.code
except (AttributeError, KeyError):
pass

return statcode

def is_error(statcode, err):
'''
Determines whether a given status code/error code combination should be
flagged as an error.
'''
# 5xx: yes
if statcode >= 500:
return True

# Response code zero with non-OK wpull code: yes
if err != 'OK':
return True

def log_info(*args):
print(*args)
# Could be an error, but we don't know it as such
return False

def is_warning(statcode, err):
'''
Determines whether a given status code/error code combination should be
flagged as a warning.
'''
return statcode >= 400 and statcode < 500

class ArchiveBotPlugin(WpullPlugin):
last_age = 0

ident = None
redis_url = None
log_key = None
log_channel = None
pipeline_channel = None
control = None

settings = None
settings_listener = None

logger = None

def log_ignore(self, url, pattern, source):
packet = dict(
ts=time.time(),
url=url,
pattern=pattern,
type='ignore',
source=source
)

self.control.log(packet, self.ident, self.log_key)

def maybe_log_ignore(self, url, pattern, source):
if not self.settings.suppress_ignore_reports():
self.log_ignore(url, pattern, source)

self.logger.info('Ignore %s using pattern %s', url, pattern)

def log_result(self, url, statcode, error):
packet = dict(
ts=time.time,
url=url,
response_code=statcode,
wget_code=error,
is_error=is_error(statcode, error),
is_warning=is_warning(statcode, error),
type='download'
)

self.control.log(packet, self.ident, self.log_key)

def print_log(self, *args):
print(*args)
sys.stdout.flush()
self.logger.info(' '.join(str(arg) for arg in args))

def handle_result(self, item_session: ItemSession, error_info:
BaseException=None):

error = 'OK'
statcode = _extract_response_code(item_session)

# Check raw and normalized URL against ignore list
pattern = self.settings.ignore_url(item_session.url_record)
if pattern:
self.maybe_log_ignore(item_session.url_record.url, pattern, 'handle_result')
return Actions.FINISH

if error_info:
error = error_info['error']

self.log_result(item_session.url_record.url, statcode, error)

settings_age = self.settings.age()
if self.last_age < settings_age:
self.last_age = settings_age
self.print_log("Settings updated: ", self.settings.inspect())
self.app_session.factory.get('Pipeline').concurrency = self.settings.concurrency()

# See that the settings listener is online
self.settings_listener.check()

if self.settings.abort_requested():
self.print_log("Wpull terminating on bot command")

while True:
try:
self.control.mark_aborted(self.ident)
break
except ConnectionError:
time.sleep(5)

return Actions.STOP

return Actions.NORMAL

def activate(self):
super().activate()

log_info('ArchiveBot hooks activated')
self.ident = os.environ['ITEM_IDENT']
self.redis_url = os.environ['REDIS_URL']
self.log_key = os.environ['LOG_KEY']
self.log_channel = shared_config.log_channel()
self.pipeline_channel = shared_config.pipeline_channel()
self.control = Control(self.redis_url, self.log_channel, self.pipeline_channel)

self.settings = mod_settings.Settings()
self.settings_listener = mod_settings.Listener(self.redis_url, self.settings,
self.control, self.ident)
self.settings_listener.start()

self.last_age = 0
self.logger = logging.getLogger('archivebot.pipeline.wpull_plugin')

self.logger.info('wpull plugin initialization complete for job ID '
'{}'.format(self.ident))


super().activate()
self.logger.info('wpull plugin activated')

def deactivate(self):
super().deactivate()

log_info('ArchiveBot hooks deactivated')
self.logger.info('wpull plugin deactivated')

@hook(PluginFunctions.accept_url)
def accept_url(self,
item_session: ItemSession,
verdict: bool,
reasons: dict):
return True

url = item_session.url_record.url_info

if (url.scheme not in ['https', 'http', 'ws', 'wss', 'ftp', 'gopher']
or url.path in [None, '/']):
return False

pattern = self.settings.ignore_url(url.raw)
if pattern:
self.maybe_log_ignore(url.raw, pattern, 'accept_url')
return False

return verdict

@event(PluginFunctions.queued_url)
def queued_url(self, url_info: URLInfo):
pass
# Report one URL added to the queue
self.control.update_items_queued(1)

@event(PluginFunctions.dequeued_url)
def dequeued_url(self, url_info: URLInfo, record_info: URLRecord):
pass
# Report one URL removed from the queue
self.control.update_items_downloaded(1)

@hook(PluginFunctions.handle_pre_response)
def handle_response(self, item_session: ItemSession):
def handle_pre_response(self, item_session: ItemSession):
url = item_session.url_record.url_info

try:
# duck typing: assume it was HTTP-like
# like wpull.protocol.http.request.Response
response = item_session.response

ICY_FIELD_PATTERN = re.compile('Icy-|Ice-|X-Audiocast-')
ICY_VALUE_PATTERN = re.compile('icecast', re.IGNORECASE)

if response.version is 'ICY':
self.maybe_log_ignore(url, '[icy version]', 'handle_pre_response')
return Actions.FINISH

for field, value in response.fields.get_all():
if ICY_FIELD_PATTERN.match(field):
self.maybe_log_ignore(url.raw, '[icy version]',
'handle_pre_response')
return Actions.FINISH

if field == 'Server' and ICY_VALUE_PATTERN.match(value):
self.maybe_log_ignore(url.raw, '[icy server]',
'handle_pre_response')
return Actions.FINISH

except (AttributeError, KeyError):
pass

return Actions.NORMAL

@hook(PluginFunctions.handle_response)
def handle_response(self, item_session: ItemSession):
return Actions.NORMAL
return self.handle_result(item_session)

@hook(PluginFunctions.handle_error)
def handle_error(self, item_session: ItemSession, error: BaseException):
return Actions.NORMAL
return self.handle_result(item_session, error)

@event(PluginFunctions.finishing_statistics)
def finishing_statistics(self,
app_session: AppSession,
statistics: Statistics):
pass
self.print_log(" ", statistics.bytes, "bytes.")

@hook(PluginFunctions.exit_status)
def exit_status(self, app_session: AppSession, exit_code: int):
self.logger.info('Advising control task {} and settings listener to stop '
'pending termination for ident '
'{}'.format(self.control, self.ident))
self.control.advise_exiting()
self.settings_listener.stop()
return exit_code

@hook(PluginFunctions.wait_time)
def wait_time(self, seconds: float, item_session: ItemSession, error):
return seconds
sl, sh = self.settings.delay_time_range()
return random.uniform(sl, sh) / 1000

# vim: ts=4:sw=4:et:tw=78
12 changes: 7 additions & 5 deletions pipeline/archivebot/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import logging
import threading
from queue import Queue, Empty
from sys import stderr
from contextlib import contextmanager
from redis.exceptions import ConnectionError as RedisConnectionError

Expand Down Expand Up @@ -66,7 +65,8 @@ def __init__(self, redis_url, log_channel, pipeline_channel):

# if ITEM_IDENT is set, we are running inside a wpull process
self.ident = os.getenv('ITEM_IDENT')
logger.info('Started new control process with ident={}, thread={}, this={}'.format(self.ident, threading.get_ident(), self))
logger.info('Started new control process with ident={}, thread={}, this={}'.format(
self.ident, threading.get_ident(), self))
# and as such this lock will be used to manage count concurrency
self.countslock = threading.Lock()

Expand All @@ -84,15 +84,17 @@ def connected(self):
return self.redis is not None

def connect(self):
logger.info('Attempting to connect to redis with ident={}, thread={}'.format(self.ident, threading.get_ident()))
logger.info('Attempting to connect to redis with ident={}, thread={}'.format(
self.ident, threading.get_ident()))
if self.redis_url is None:
raise ConnectionError('self.redis_url not set')

self.redis = redis.StrictRedis.from_url(self.redis_url,
decode_responses=True)

self.register_scripts()
logger.info('Redis connection successful with ident={}, thread={}'.format(self.ident, threading.get_ident()))
logger.info('Redis connection successful with ident={}, thread={}'.format(
self.ident, threading.get_ident()))

def disconnect(self):
self.redis = None
Expand Down Expand Up @@ -219,7 +221,7 @@ def ship_logs(self):
try:
entry = self.log_queue.get(timeout=5)
with conn(self):
self.log_script(keys=entry['keys'], args=entry['args'], client=pipe)
self.log_script(keys=entry['keys'], args=entry['args'], client=pipe)

shipping_count += 1
self.log_queue.task_done()
Expand Down
Loading

0 comments on commit 967d5aa

Please sign in to comment.