refactor: link controller buffered channel to hold click data
This commit is contained in:
+87
-20
@@ -4,6 +4,82 @@ module App::Controllers
|
||||
include App::Lib
|
||||
include App::Services
|
||||
|
||||
# Buffered channel to hold click data
|
||||
@@click_channel = Channel(NamedTuple(
|
||||
link_id: Int64,
|
||||
remote_address: String,
|
||||
user_agent: String?,
|
||||
referer: String
|
||||
)).new(10000) # Buffer size
|
||||
|
||||
@@processor_started = begin
|
||||
spawn do
|
||||
batch_size = 125
|
||||
batch = [] of NamedTuple(
|
||||
link_id: Int64,
|
||||
remote_address: String,
|
||||
user_agent: String?,
|
||||
referer: String
|
||||
)
|
||||
|
||||
loop do
|
||||
select
|
||||
when click_data = @@click_channel.receive
|
||||
batch << click_data
|
||||
|
||||
# Collect clicks until we have a batch or a timeout
|
||||
if batch.size >= batch_size
|
||||
process_click_batch(batch)
|
||||
batch.clear
|
||||
end
|
||||
when timeout(0.5.seconds)
|
||||
# Process whatever we have after timeout
|
||||
unless batch.empty?
|
||||
process_click_batch(batch)
|
||||
batch.clear
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
true
|
||||
end
|
||||
|
||||
private def self.process_click_batch(batch)
|
||||
clicks = [] of App::Models::Click
|
||||
|
||||
batch.each do |click_data|
|
||||
begin
|
||||
client_ip = IpLookup.ip_from_address(click_data[:remote_address])
|
||||
family, _, _, os = UserAgent.parse(click_data[:user_agent] || "")
|
||||
|
||||
click = App::Models::Click.new
|
||||
click.link_id = click_data[:link_id]
|
||||
click.country = client_ip ? IpLookup.country(client_ip) : nil
|
||||
click.user_agent = click_data[:user_agent]
|
||||
click.browser = family
|
||||
click.os = os.try &.[0] # OS family
|
||||
click.referer = click_data[:referer]
|
||||
|
||||
clicks << click
|
||||
rescue ex
|
||||
Log.error { "Click data processing error: #{ex.message}" }
|
||||
end
|
||||
end
|
||||
|
||||
# Batch insert clicks if any were successfully processed
|
||||
unless clicks.empty?
|
||||
begin
|
||||
multi = Crecto::Multi.new
|
||||
clicks.each do |click|
|
||||
multi.insert(click)
|
||||
end
|
||||
Database.transaction(multi)
|
||||
rescue ex
|
||||
Log.error { "Batch click insertion error: #{ex.message}" }
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def self.redirect_handler
|
||||
->(env : HTTP::Server::Context) {
|
||||
link_id, url = Database.raw_query("SELECT id, url FROM links WHERE slug = (?) LIMIT 1", env.params.url["slug"]) do |result|
|
||||
@@ -16,26 +92,17 @@ module App::Controllers
|
||||
env.response.headers.add("Location", url)
|
||||
env.response.headers.add("X-Forwarded-For", remote_address)
|
||||
|
||||
spawn do
|
||||
begin
|
||||
client_ip = IpLookup.ip_from_address(remote_address)
|
||||
user_agent_str = env.request.headers["User-Agent"]?
|
||||
referer = env.request.headers["Referer"]?.try { |r| URI.parse(r).host rescue r } || env.params.query["utm_source"]? || "Direct"
|
||||
|
||||
family, _, _, os = user_agent_str ? UserAgent.parse(user_agent_str) : {nil, nil, nil, nil}
|
||||
|
||||
click = App::Models::Click.new
|
||||
click.link_id = link_id
|
||||
click.country = client_ip ? IpLookup.country(client_ip) : nil
|
||||
click.user_agent = user_agent_str
|
||||
click.browser = family
|
||||
click.os = os.try &.[0] # Access the first element of the os tuple (the family)
|
||||
click.referer = referer
|
||||
|
||||
Database.insert(click)
|
||||
rescue ex
|
||||
Log.error { "Click tracking error: #{ex.message}" }
|
||||
end
|
||||
begin
|
||||
@@click_channel.send({
|
||||
link_id: link_id,
|
||||
remote_address: remote_address,
|
||||
user_agent: env.request.headers["User-Agent"]?,
|
||||
referer: env.request.headers["Referer"]?.try { |r| URI.parse(r).host rescue r } || env.params.query["utm_source"]? || "Direct"
|
||||
})
|
||||
rescue Channel::ClosedError
|
||||
Log.error { "Click channel closed" }
|
||||
rescue ex
|
||||
Log.error { "Error queuing click: #{ex.message}" }
|
||||
end
|
||||
}
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user