View Javadoc
1   package com.acumenvelocity.ath.gcs;
2   
3   import java.io.File;
4   import java.io.FileInputStream;
5   import java.io.InputStream;
6   import java.net.URI;
7   import java.nio.ByteBuffer;
8   import java.nio.channels.Channels;
9   import java.util.ArrayList;
10  import java.util.List;
11  
12  import com.acumenvelocity.ath.common.Const;
13  import com.acumenvelocity.ath.common.Log;
14  import com.acumenvelocity.ath.common.exception.AthException;
15  import com.google.api.gax.paging.Page;
16  import com.google.auth.Credentials;
17  import com.google.auth.oauth2.GoogleCredentials;
18  import com.google.cloud.ReadChannel;
19  import com.google.cloud.WriteChannel;
20  import com.google.cloud.storage.Blob;
21  import com.google.cloud.storage.BlobId;
22  import com.google.cloud.storage.BlobInfo;
23  import com.google.cloud.storage.Storage;
24  import com.google.cloud.storage.Storage.BlobListOption;
25  import com.google.cloud.storage.StorageOptions;
26  
27  public class AthStorage {
28  
29    private static Storage gcs;
30  
31    public static void init() throws Exception {
32      Credentials credentials = GoogleCredentials
33          .fromStream(new FileInputStream(Const.ATH_GCP_SECRET_FILE));
34  
35      gcs = StorageOptions.newBuilder().setCredentials(credentials).build()
36          .getService();
37    }
38  
39    public static boolean exists(URI gcsUrl) {
40      try {
41        BlobId blobId = BlobId.fromGsUtilUri(gcsUrl.toString());
42        Blob blob = gcs.get(blobId);
43  
44        return blob != null;
45  
46      } catch (Exception e) {
47        return false;
48      }
49    }
50  
51    // public static void storeFile(URI gcsUrl, String contentType, File file) throws Exception {
52    // // BlobId blobId = BlobId.of(Const.ATH_GCS_BUCKET, objectName);
53    // BlobId blobId = BlobId.fromGsUtilUri(gcsUrl.toString());
54    // BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType(contentType).build();
55    //
56    // try (InputStream inputStream = new FileInputStream(file)) {
57    // try (WriteChannel writer = gcs.writer(blobInfo)) {
58    // ByteStreams.copy(inputStream, Channels.newOutputStream(writer));
59    // }
60    // }
61    // }
62  
63    public static void storeFile(URI gcsUrl, String contentType, File file) throws Exception {
64      BlobId blobId = BlobId.fromGsUtilUri(gcsUrl.toString());
65  
66      // FIX 1: Remove setSize(Long) as it's not visible/available
67      BlobInfo blobInfo = BlobInfo.newBuilder(blobId)
68          .setContentType(contentType)
69          .build();
70  
71      long fileLength = file.length();
72  
73      // Check input file size
74      if (fileLength == 0) {
75        // Use your AthException or throw standard exception
76        throw new Exception("Input file is 0 bytes. Cannot store to GCS.");
77      }
78  
79      // FIX 2: Declare totalBytesCopied before the try-block
80      // to ensure it's accessible for the final check.
81      long totalBytesCopied = 0;
82  
83      try (InputStream inputStream = new FileInputStream(file)) {
84  
85        // Use the WriteChannel directly for binary data transfer
86        try (WriteChannel writer = gcs.writer(blobInfo)) {
87  
88          byte[] buffer = new byte[8192];
89          int bytesRead;
90  
91          while ((bytesRead = inputStream.read(buffer)) != -1) {
92            // Write to the GCS WriteChannel using a ByteBuffer
93            writer.write(ByteBuffer.wrap(buffer, 0, bytesRead));
94            totalBytesCopied += bytesRead;
95          }
96  
97          // The try-with-resources block closes the writer, which finalizes the upload.
98        }
99      }
100 
101     // FIX 3: totalBytesCopied is now accessible here
102     if (totalBytesCopied != fileLength) {
103       System.err.println("GCS upload warning: Copied bytes (" + totalBytesCopied +
104           ") does not match input file size (" + fileLength + ")");
105       // This mismatch is a strong indicator of the 0-byte issue persisting.
106     }
107   }
108 
109   public static void loadFile(URI gcsUrl, File file) throws Exception {
110     // BlobId blobId = BlobId.of(Const.ATH_GCS_BUCKET, objectName);
111     BlobId blobId = BlobId.fromGsUtilUri(gcsUrl.toString());
112     Blob blob = gcs.get(blobId);
113 
114     if (blob == null) {
115       AthException.logAndThrow(AthStorage.class, "Cannot acccess GCS blob '{}'", blobId);
116     }
117 
118     blob.downloadTo(file.toPath());
119   }
120 
121   public static InputStream getInputStream(URI gcsUrl) throws Exception {
122     // BlobId blobId = BlobId.of(Const.ATH_GCS_BUCKET, objectName);
123     BlobId blobId = BlobId.fromGsUtilUri(gcsUrl.toString());
124     Blob blob = gcs.get(blobId);
125 
126     if (blob == null) {
127       AthException.logAndThrow(AthStorage.class, "Cannot acccess GCS blob '{}'", blobId);
128     }
129 
130     ReadChannel channel = blob.reader();
131     return Channels.newInputStream(channel);
132   }
133 
134   public static Storage getGcs() {
135     return gcs;
136   }
137 
138   public static List<String> getNames(String bucketName, String prefix) throws Exception {
139     List<String> list = new ArrayList<>();
140 
141     Page<Blob> blobs = gcs.list(bucketName, BlobListOption.prefix(prefix));
142 
143     for (Blob blob : blobs.iterateAll()) {
144       list.add(blob.getName());
145     }
146 
147     return list;
148   }
149 
150   public static void deleteMany(String bucketName, String prefix) throws Exception {
151     Page<Blob> blobs = gcs.list(bucketName, BlobListOption.prefix(prefix));
152 
153     for (Blob blob : blobs.iterateAll()) {
154       Log.info(AthStorage.class, "Deleting GCS blob '{}'", blob.getName());
155       blob.delete();
156     }
157   }
158 }