rc/bin/work
2020-11-11 04:56:52 +01:00

297 lines
9.5 KiB
Python
Executable file

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import notify
from signal import signal, SIGINT
import sys
from time import sleep
from os.path import expanduser, join
from datetime import datetime, timedelta
TIME_FILE = join(expanduser("~"), "timekeeping.csv")
TODAY_FILE = join(expanduser("~"), ".timekeeping")
is_on_break = False
today_fields = []
start_time = datetime.now()
def die(*args):
print("Error: {}".format(*args), file=sys.stderr)
sys.exit(1)
def log(*args):
print(*args, file=sys.stderr)
def signal_handler(signal, frame):
global today_fields
now = datetime.now()
hour = now.strftime("%H:%M")
if not is_on_break:
sys.exit(0)
else:
break_time = now - start_time
log("\nBreak ended at {} and lasted {}".format(
hour, td_format(break_time)))
today_fields.append(start_time.strftime("%H:%M"))
today_fields.append(hour)
with open(TODAY_FILE, "w") as f:
f.write(",".join(today_fields))
sys.exit(0)
def td_format(td):
prefix = ""
if td < timedelta(0):
td = -td
prefix = "-"
hours, remainder = divmod(td.total_seconds(), 3600)
minutes, seconds = divmod(remainder, 60)
return '{}{:d}:{:02d}'.format(prefix, int(hours), int(minutes))
def get_today_fields():
try:
with open(TODAY_FILE, "r") as f:
line_count = 0
for line in f:
line_count += 1
if line_count > 1:
log("Warning: incomplete day leftover in {}".format(TODAY_FILE))
if line.startswith(start_time.strftime("%Y-%m-%d")):
return line.strip().split(",")
except FileNotFoundError:
return []
return []
def work(args):
total_delta = timedelta()
month_delta = timedelta()
count = 0
month_count = 0
with open(TIME_FILE, "r") as f:
for line in f:
fields = line.strip().split(",", 5)
date = datetime.strptime(fields[0], "%Y-%m-%d")
t = datetime.strptime(fields[4], "%H:%M")
delta = timedelta(hours=t.hour, minutes=t.minute)
if date.month == start_time.month:
month_delta += delta
month_count += 1
total_delta += delta
count += 1
eight_h = timedelta(hours=8) * count
mean_time = total_delta / count if count > 0 else total_delta
minutes = total_delta - eight_h
month_mean_time = month_delta / month_count if month_count > 0 else month_delta
month_min = month_delta - timedelta(hours=(8 * month_count))
print("stats: {}, {}".format(
td_format(mean_time),
td_format(minutes)))
print("month stats: {}, {}".format(
td_format(month_mean_time),
td_format(month_min)))
# Try to calculate remaining time
fields = get_today_fields()
hour = start_time.strftime("%H:%M")
if not fields:
print("No work ongoing")
return
fields.append(hour)
# Test for even number of timestamp (but fields[0] is the date)
if len(fields) < 3:
die("not enough fields in {}".format(TODAY_FILE))
elif len(fields) % 2 == 0:
# Break in progress
log("Break in progress")
return
begin_time = None
worked_time = timedelta()
for field in fields[1:]:
try:
if begin_time is None:
begin_time = datetime.strptime(field, "%H:%M")
else:
end_time = datetime.strptime(field, "%H:%M")
worked_time += end_time - begin_time
begin_time = None
except ValueError:
die("couldn't parse field '{}' in {}".format(
field, TODAY_FILE))
day_start = datetime.strptime(fields[1], "%H:%M")
day_end = datetime.strptime(fields[-1], "%H:%M")
total_time = day_end - day_start
break_time = total_time - worked_time
day_end_estimation = day_start + timedelta(hours=8) + break_time
print("Worked {} already today. Estimated leave at {}".format(
td_format(worked_time),
day_end_estimation.strftime("%H:%M")))
def work_start(args):
fields = get_today_fields()
try:
hour = datetime.strptime(args.time, "%H:%M")
except ValueError:
die("Error: {} is not a valid time".format(args.time))
if fields:
die("You already started working")
with open(TODAY_FILE, "a") as f:
f.write("{},{}".format(
start_time.strftime("%Y-%m-%d"),
hour.strftime("%H:%M")))
log("Started working at {}".format(hour.strftime("%H:%M")))
def work_pause(args):
global is_on_break
global today_fields
is_on_break = True
today_fields = get_today_fields()
hour = start_time.strftime("%H:%M")
if not today_fields:
die("no work to take a break from")
log("Taking a break at {}".format(hour))
notify.init("work")
# Wait to be stopped by a Ctrl-C
reminder_interval = 5 # In minutes
count = 0
while True:
sleep(reminder_interval * 60)
count += 1
notify.send("Pause reminder",
"It has now been {} minutes".format(count * reminder_interval))
def work_end(args):
fields = get_today_fields()
hour = start_time.strftime("%H:%M")
if not fields:
die("Why try to leave when you haven't even started")
fields.append(hour)
if len(fields) < 3:
die("not enough fields in {}".format(TODAY_FILE))
# Test for even number of timestamp (but fields[0] is the date)
elif len(fields) % 2 == 0:
die("odd number of timestamps in {}".format(TODAY_FILE))
begin_time = None
worked_time = timedelta()
for field in fields[1:]:
try:
if begin_time is None:
begin_time = datetime.strptime(field, "%H:%M")
else:
end_time = datetime.strptime(field, "%H:%M")
worked_time += end_time - begin_time
begin_time = None
except ValueError:
die("couldn't parse field '{}' in {}".format(
field, TODAY_FILE))
day_start = datetime.strptime(fields[1], "%H:%M")
day_end = datetime.strptime(fields[-1], "%H:%M")
total_time = day_end - day_start
break_time = total_time - worked_time
with open(TIME_FILE, "a") as f:
f.write('{},{},{},{},{},"{}"\n'.format(
fields[0],
day_start.strftime("%H:%M"),
start_time.strftime("%H:%M"),
td_format(break_time),
td_format(worked_time),
args.description))
# Erase TODAY_FILE
with open(TODAY_FILE, "w") as f:
f.write("")
f.flush()
log("Finished working at {} after working {}".format(
hour, td_format(worked_time)))
def work_export(args):
print("export")
def work_parse(args):
new_lines = []
try:
with open(args.file, "r") as f:
for line in f:
fields = line.strip().split(",")
if len(fields) < 6:
log("Record: '{}' hasn't got enough fields ({})".format(line, len(fields)))
continue
times = []
for field in fields[1:]:
try:
time = datetime.strptime(field, "%H:%M")
times.append(time)
except ValueError:
break
if len(times) % 2 != 0:
die("Error: uneven number of timestamps ({}) in line '{}'".format(len(times), line))
desc = ','.join(fields[len(times)+1:]).strip()
worked_time = timedelta()
for i in range(int(len(times) / 2)):
worked_time += times[i * 2 + 1] - times[i * 2]
morning = times[0]
evening = times[-1]
full_day = evening - morning
break_time = full_day - worked_time
new_lines.append('{},{},{},{},{},{}'.format(
fields[0],
morning.strftime("%H:%M"),
evening.strftime("%H:%M"),
td_format(break_time),
td_format(worked_time),
desc))
except FileNotFoundError:
die("file not found: {}".format(args.file))
# TODO do sanity checking, like if a day already exists
with open(TIME_FILE, "a") as f:
for line in new_lines:
f.write("{}\n".format(line))
log("Written {} new entries to {}".format(len(new_lines), TIME_FILE))
if __name__ == "__main__":
# Handle Ctrl-C
signal(SIGINT, signal_handler)
parser = argparse.ArgumentParser()
parser.set_defaults(func=work)
commands = parser.add_subparsers(dest="command")
start_parser = commands.add_parser("start")
pause_parser = commands.add_parser("pause")
end_parser = commands.add_parser("end")
export_parser = commands.add_parser("export")
parse_parser = commands.add_parser("parse")
start_parser.set_defaults(func=work_start)
start_parser.add_argument("time", nargs="?", default=datetime.now().strftime("%H:%M"))
pause_parser.set_defaults(func=work_pause)
end_parser.set_defaults(func=work_end)
end_parser.add_argument("description")
export_parser.set_defaults(func=work_export)
parse_parser.set_defaults(func=work_parse)
parse_parser.add_argument("file")
args = parser.parse_args()
args.func(args)
#now = datetime.now()
#with open(TIME_FILE, "r") as f:
# for line in f:
# print(line.strip())