forked from JulienPalard/certificate_watcher
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcertificate_watcher.py
executable file
·243 lines (207 loc) · 7.28 KB
/
certificate_watcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
#!/usr/bin/env python3
"""Prints (on stderr) the list and expiration time ssl certificats of
all given domains like:
./warn_expire.py mdk.fr python.org duckduckgo.com
mdk.fr expire in 2 days
"""
import argparse
import csv
import re
import socket
import ssl
import sys
from datetime import datetime, timedelta
from ocspchecker import ocspchecker
__version__ = "0.1.1"
def get_server_certificate(service, timeout=10):
"""Retrieve the certificate from the server at the specified address" """
context = ssl.create_default_context()
context.options &= ssl.CERT_REQUIRED
context.check_hostname = True
with socket.create_connection(service.address, timeout) as sock:
with context.wrap_socket(sock, server_hostname=service.hostname) as sslsock:
return sslsock.getpeercert()
def parse_args():
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
prog="Certificate Watcher",
description="Watch expiration of certificates of a bunch of websites.",
)
parser.add_argument(
"--verbose",
"-v",
action="count",
default=0,
help="Add OK lines if all tests are OK",
)
parser.add_argument(
"--csv", "-c",
nargs='?',
const=',',
default=False,
type=str, help="Output as coma-separated values, optionnal value for delimiter possible default ','"
)
parser.add_argument(
"--attention",
"-a",
action="store_true",
help=r"Add '\a' in case of KO in order to generate beeps "
"(depending of the terminal)",
)
parser.add_argument(
"--check-ocsp",
"-o",
action="store_true",
help="OCSP CRL check, time consuming, advance checks not supported currently",
)
parser.add_argument(
"--low",
"-l",
default=15,
type=int,
help="Number of days before expiration considered as low (default 15 days)",
)
parser.add_argument(
"--high",
"-H",
default=365,
type=int,
help="Number of days after validation considered as high (default 365 days)",
)
parser.add_argument(
"--timeout",
"-t",
default=10.0,
type=float,
help="Number of seconds (real) before timeout (default 10.0 seconds)",
)
parser.add_argument(
"-f",
"--from-file",
type=argparse.FileType("r"),
help="Check host from this file (one per line)",
)
parser.add_argument("hosts", nargs="*", help="Hosts to check")
parser.add_argument(
"--version", action="version", version="%(prog)s " + __version__
)
return parser.parse_args()
class Service:
"""Represent a host or an host:port pair (port defaults to 443).
Optionally the host:port pair can be augmented by an IP to bypass
DNS resolution.
The optional IP address is given prefixed by an `@`, this is usefull to
poke multiple backends like:
s1 = Service("[email protected]")
s2 = Service("[email protected]")
A domain name, to be resolved, can be used in this field too, like:
s1 = Service("[email protected]")
s2 = Service("[email protected]")
The `@host` and `:port` have no specific order, both
"example.com:[email protected]" and "[email protected]:443" are
parsed equally.
"""
SPEC = "(?P<ip>@[^@:]+)|(?P<port>:[^@:]+)|(?P<hostname>[^@:]+)"
def __init__(self, description):
self.description = description
self.ip_addr = None
self.port = 443
self.hostname = None
for token in re.finditer(Service.SPEC, description):
kind = token.lastgroup
value = token.group()
if kind == "ip":
self.ip_addr = value[1:]
if kind == "port":
self.port = int(value[1:])
if kind == "hostname":
self.hostname = value
if self.hostname is None:
raise ValueError("A service cannot have no hostname.")
def __repr__(self):
return self.description
@property
def address(self):
"""Return a 2-tuple (host, port).
If ip is given, (ip, port) is returned instead.
"""
return (self.ip_addr or self.hostname, self.port)
class CertificateValidationError(Exception):
"""Raised by validate_certificate on any certificate error."""
def validate_certificate(
service: Service,
limitlow: timedelta = timedelta(days=15),
limithigh: timedelta = timedelta(days=365),
check_ocsp: bool = False,
timeout=10,
):
"""Check for a certificate validity on a remote host.
Raises CertificateValidationError with a specific message if an
issue is found.
>>> validate_certificate(Service("mdk.fr"))
"""
try:
cert = get_server_certificate(service, timeout=timeout)
except socket.timeout as err:
raise CertificateValidationError("connect timeout") from err
except ConnectionResetError as err:
raise CertificateValidationError("Connection reset") from err
except Exception as err:
raise CertificateValidationError(str(err)) from err
else:
not_after = datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y GMT")
not_before = datetime.strptime(cert["notBefore"], "%b %d %H:%M:%S %Y GMT")
expire_in = not_after - datetime.utcnow()
certificate_age = datetime.utcnow() - not_before
if (
bool(check_ocsp)
and ocspchecker.get_ocsp_status(service.hostname, service.port)[2]
== "OCSP Status: REVOKED"
):
raise CertificateValidationError("OCSP Satus: REVOKED")
if expire_in < limitlow:
raise CertificateValidationError(
f"Certificate expires in {expire_in.total_seconds() // 86400:.0f} days"
)
if certificate_age > limithigh:
raise CertificateValidationError(
"Certificate is too old (has been created "
f"{certificate_age.total_seconds() // 86400:.0f} days ago)"
)
def printrow(row):
"""The non-csv printer used by main."""
print(*row, sep=": ")
def main():
"""Command-line tool (certificate_watcher) entry point."""
args = parse_args()
if args.csv:
writer = csv.writer(sys.stdout, delimiter=args.csv)
writer.writerow(["Service", "Status"])
writerow = writer.writerow
else:
writerow = printrow
if args.from_file:
args.hosts.extend(
host.strip()
for host in args.from_file.read().split("\n")
if host and not host.startswith("#")
)
args.from_file.close()
for service in map(Service, args.hosts):
try:
validate_certificate(
service,
limitlow=timedelta(days=args.low),
limithigh=timedelta(days=args.high),
check_ocsp=args.check_ocsp,
timeout=args.timeout,
)
except CertificateValidationError as error:
writerow([str(service), str(error)])
if not args.csv and args.attention:
print("\a")
else:
if args.verbose:
writerow([str(service), "OK"])
if __name__ == "__main__":
main()