require 'net/http' require 'uri' require 'open3' require 'fileutils' require 'open-uri' require 'shellwords' module Archive MAGIC_NUMBERS = { "\x1F\x8B" => 'gzip', "\x50\x4B\x03\x04" => 'zip', "\x28\xB5\x2F\xFD" => 'zstd', "\xFD\x37\x7A\x58" => 'xz' } def self.extract(archive_path, destination_dir) format = detect_format_from_url(archive_path) unless format format = detect_format_from_magic(archive_path) end extract_archive(archive_path, destination_dir, format) end def self.fetch_and_extract(url, cache_path, destination_dir) uri = URI.parse(url) file_name = File.basename(uri.path) archive_path = File.join(destination_dir, file_name) format = detect_format_from_url(url) puts "format #{format}" download_file(uri, archive_path) # Probably do not work unless format puts "case 1" format = detect_format_from_headers(uri) end unless format puts "case 2" format = detect_format_from_magic(archive_path) end raise "Could not determine archive format" unless format extract_archive(archive_path, format) end def self.download_file(uri, output_path, forced=false) return if File.exist?(output_path) && !forced # Ensure the directory exists dir = File.dirname(output_path) FileUtils.mkdir_p(dir) unless Dir.exist?(dir) URI.open(uri) do |input| File.open(output_path, 'wb') do |output| IO.copy_stream(input, output) end end end private def self.detect_format_from_url(url) case File.extname(url) when '.gz', '.tgz' then 'gzip' when '.zip' then 'zip' when '.zst' then 'zstd' when '.xz' then 'xz' else nil end end def self.detect_format_from_headers(uri) response = Net::HTTP.get_response(uri) content_type = response['content-type'] case content_type when /gzip/ then 'gzip' when /zip/ then 'zip' when /zstd/ then 'zstd' when /xz/ then 'xz' else nil end end def self.detect_format_from_magic(file_path) File.open(file_path, 'rb') do |f| bytes = f.read(4) MAGIC_NUMBERS.each do |magic, format| return format if bytes.start_with?(magic) end end nil end require 'shellwords' def self.extract_archive(file_path, destination_dir, format) # TODO: need to be checked escaped_file = Shellwords.escape(file_path) escaped_dir = Shellwords.escape(destination_dir) system("mkdir -p #{escaped_dir}") case format when 'gzip' system("tar -xzf #{escaped_file} -C #{escaped_dir}") when 'zip' system("unzip -d #{escaped_dir} #{escaped_file}") when 'zstd' if `file #{escaped_file}`.include?('tar archive') system("unzstd -c #{escaped_file} | tar -xf - -C #{escaped_dir}") else system("unzstd -o \"#{escaped_dir}\" #{escaped_file}") end when 'xz' if `file #{escaped_file}`.include?('tar archive') system("tar -xJf #{escaped_file} -C #{escaped_dir}") else system("xz -dk #{escaped_file}") decompressed = file_path.sub(/\.xz$/, '') system("mv #{Shellwords.escape(decompressed)} #{escaped_dir}/") end else raise "Unsupported archive format: #{format}" end end # def self.extract_archive(file_path, destination_dir, format) # case format # when 'gzip' # system("tar -xzf #{Shellwords.escape(file_path)}") # when 'zip' # system("unzip #{Shellwords.escape(file_path)}") # when 'zstd' # system("unzstd #{Shellwords.escape(file_path)}") # when 'xz' # # if file_path.end_with?('.tar.xz') # if `file #{Shellwords.escape(file_path)}`.include?('tar archive') # system("tar -xJf #{Shellwords.escape(file_path)}") # else # system("xz -dk #{Shellwords.escape(file_path)}") # end # else # raise "Unsupported archive format: #{format}" # end # end end # Example usage: # fetch_and_extract("https://codeberg.org/forgejo/forgejo/releases/download/v12.0.1/forgejo-12.0.1-linux-amd64.xz")